#####################################################################
# #
# /labscript_devices/ZaberStageController/blacs_workers.py #
# #
# Copyright 2019, Monash University and contributors #
# #
# This file is part of labscript_devices, in the labscript suite #
# (see http://labscriptsuite.org), and is licensed under the #
# Simplified BSD License. See the license.txt file in the root of #
# the project for the full license. #
# #
#####################################################################
from blacs.tab_base_classes import Worker
from time import monotonic
from labscript_utils import dedent
import labscript_utils.h5_lock, h5py
from .utils import get_device_number
zaber = None
TIMEOUT = 60
[docs]
class MockZaberInterface(object):
def __init__(self, com_port):
from collections import defaultdict
self.positions = defaultdict(int)
[docs]
def move(self, device_number, position):
print(f"Mock move device {device_number} to position {position}")
self.positions[device_number] = position
[docs]
def get_position(self, device_number):
return self.positions[device_number]
[docs]
def close(self):
print(f"mock close")
[docs]
class ZaberInterface(object):
def __init__(self, com_port):
global zaber
try:
import zaber.serial as zaber
except ImportError:
msg = """Could not import zaber.serial module. Please ensure it is
installed. It is installable via pip with 'pip install zaber.serial'"""
raise ImportError(dedent(msg))
self.port = zaber.BinarySerial(com_port)
[docs]
def move(self, device_number, position):
device = zaber.BinaryDevice(self.port, device_number)
device.move_abs(position)
deadline = monotonic() + TIMEOUT
while device.get_position() != position:
if monotonic() > deadline:
msg = "Device did not move to requested position within timeout"
raise TimeoutError(msg)
[docs]
def get_position(self, device_number):
device = zaber.BinaryDevice(self.port, device_number)
return device.get_position()
[docs]
def close(self):
self.port.close()
[docs]
class ZaberWorker(Worker):
[docs]
def init(self):
if self.mock:
self.controller = MockZaberInterface(self.com_port)
else:
self.controller = ZaberInterface(self.com_port)
[docs]
def check_remote_values(self):
remote_values = {}
for connection in self.child_connections:
device_number = get_device_number(connection)
remote_values[connection] = self.controller.get_position(device_number)
return remote_values
[docs]
def program_manual(self, values):
for connection, value in values.items():
device_number = get_device_number(connection)
self.controller.move(device_number, int(round(value)))
return self.check_remote_values()
[docs]
def transition_to_buffered(self, device_name, h5file, initial_values, fresh):
with h5py.File(h5file, 'r') as hdf5_file:
group = hdf5_file['/devices/' + device_name]
if 'static_values' in group:
data = group['static_values']
values = {name: data[0][name] for name in data.dtype.names}
else:
values = {}
return self.program_manual(values)
[docs]
def transition_to_manual(self):
return True
[docs]
def abort_buffered(self):
return True
[docs]
def abort_transition_to_buffered(self):
return True
[docs]
def shutdown(self):
self.controller.close()