#####################################################################
# #
# /labscript_devices/PrawnDO/blacs_workers.py #
# #
# Copyright 2023, Philip Starkey, Carter Turnbaugh, Patrick Miller #
# #
# This file is part of labscript_devices, in the labscript suite #
# (see http://labscriptsuite.org), and is licensed under the #
# Simplified BSD License. See the license.txt file in the root of #
# the project for the full license. #
# #
#####################################################################
from blacs.tab_base_classes import Worker
import labscript_utils.h5_lock, h5py
import labscript_utils
from labscript import LabscriptError
import numpy as np
import re
import time
[docs]
class PrawnDOInterface(object):
min_version = (1, 2, 0)
"""Minimum compatible firmware version tuple"""
def __init__(self, com_port):
global serial; import serial
global struct; import struct
self.timeout = 0.2
self.conn = serial.Serial(com_port, 10000000, timeout=self.timeout)
version = self.get_version()
print(f'Connected to version: {version}')
# ensure firmware is compatible
assert version >= self.min_version, f'Incompatible firmware, must be >= {self.min_version}'
current_status = self.status()
print(f'Current status is {current_status}')
[docs]
def get_version(self):
self.conn.write(b'ver\r\n')
version_str = self.conn.readline().decode()
assert version_str.startswith("Version: ")
version = tuple(int(i) for i in version_str[9:].split('.'))
assert len(version) == 3
return version
[docs]
def _read_full_buffer(self):
'''Used to get any extra lines from device after a failed send_command'''
resp = self.conn.readlines()
str_resp = ''.join([st.decode() for st in resp])
return str_resp
[docs]
def send_command(self, command, readlines=False):
'''Sends the supplied string command and checks for a response.
Automatically applies the correct termination characters.
Args:
command (str): Command to send. Termination and encoding is done automatically.
readlines (bool, optional): Use pyserial's readlines functionality to read multiple
response lines. Slower as it relies on timeout to terminate reading.
Returns:
str: String response from the PrawnDO
'''
command += '\r\n'
self.conn.write(command.encode())
if readlines:
str_resp = self._read_full_buffer()
else:
str_resp = self.conn.readline().decode()
return str_resp
[docs]
def send_command_ok(self, command):
'''Sends the supplied string command and confirms 'ok' response.
Args:
command (str): String command to send.
Raises:
LabscriptError: If response is not `ok\\r\\n`
'''
resp = self.send_command(command)
if resp != 'ok\r\n':
# get complete error message
resp += self._read_full_buffer()
raise LabscriptError(f"Command '{command:s}' failed. Got response '{repr(resp)}'")
[docs]
def status(self):
'''Reads the status of the PrawnDO
Returns:
(int, int): tuple containing
- **run-status** (int): Run status code
- **clock-status** (int): Clock status code
Raises:
LabscriptError: If response is not `ok\\r\\n`
'''
resp = self.send_command('sts')
match = re.match(r"run-status:(\d) clock-status:(\d)(\r\n)?", resp)
if match:
return int(match.group(1)), int(match.group(2))
else:
resp += self._read_full_buffer()
raise LabscriptError(f'PrawnDO invalid status, returned {repr(resp)}')
[docs]
def output_state(self):
'''Reads the current output state of the PrawnDO
Returns:
int: Output state of all 16 bits
Raises:
LabscriptError: If response is not `ok\\r\\n`
'''
resp = self.send_command('gto')
try:
resp_i = int(resp, 16)
except Exception as e:
resp += self._read_full_buffer()
raise LabscriptError(f'Remote value check failed. Got response {repr(resp)}') from e
return resp_i
[docs]
def adm_batch(self, pulse_program):
'''Sends pulse program as single binary block using `adm` command.
Args:
pulse_program (numpy.ndarray): Structured array of program to send.
Must have first column as bit sets (<u2) and second as reps (<u4).
'''
self.conn.write('adm 0 {:x}\n'.format(len(pulse_program)).encode())
resp = self.conn.readline().decode()
if resp != 'ready\r\n':
resp += self._read_full_buffer()
raise LabscriptError(f'adm command failed, got response {repr(resp)}')
self.conn.write(pulse_program.tobytes())
resp = self.conn.readline().decode()
if resp != 'ok\r\n':
resp += self._read_full_buffer()
raise LabscriptError(f'Program not written successfully, got response {repr(resp)}')
[docs]
def close(self):
self.conn.close()
[docs]
class PrawnDOWorker(Worker):
[docs]
def init(self):
self.intf = PrawnDOInterface(self.com_port)
self.smart_cache = {'do_table':None, 'reps':None}
[docs]
def _dict_to_int(self, d):
"""Converts dictionary of outputs to an integer mask.
Args:
d (dict): Dictionary of output states
Returns:
int: Integer mask of the 16 output states.
"""
val = 0
for conn, value in d.items():
val |= value << int(conn.split('do')[-1])
return val
[docs]
def _int_to_dict(self, val):
"""Converts an integer mask to a dictionary of outputs.
Args:
val (int): 16-bit integer mask to convert
Returns:
dict: Dictonary with output channels as keys and values are boolean states
"""
return {f'do{i:d}':((val >> i) & 1) for i in range(16)}
[docs]
def check_status(self):
'''Checks operational status of the PrawnDO.
Automatically called by BLACS to update status.
Returns:
(int, int): Tuple containing:
- **run-status** (int): Possible values are:
* 0 : manual mode
* 1 : transitioning to buffered execution
* 2 : buffered execution
* 3 : abort requested
* 4 : aborting buffered execution
* 5 : last buffered execution aborted
* 6 : transitioning to manual mode
- **clock-status** (int): Possible values are:
* 0 : internal clock
* 1 : external clock
'''
return self.intf.status()
[docs]
def program_manual(self, front_panel_values):
"""Change output states in manual mode.
Returns:
dict: Output states after command execution.
"""
value = self._dict_to_int(front_panel_values)
# send static state
self.intf.send_command_ok(f'man {value:04x}')
# confirm state set correctly
resp_i = self.intf.output_state()
return self._int_to_dict(resp_i)
[docs]
def check_remote_values(self):
"""Checks the remote state of the PrawnDO.
Called automatically by BLACS.
Returns:
dict: Dictionary of the digital output states.
"""
resp_i = self.intf.output_state()
return self._int_to_dict(resp_i)
[docs]
def transition_to_buffered(self, device_name, h5file, initial_values, fresh):
if fresh:
self.smart_cache = {'pulse_program':None}
with h5py.File(h5file, 'r') as hdf5_file:
group = hdf5_file['devices'][device_name]
if 'pulse_program' not in group:
# if no output commanded, return
return
self.device_properties = labscript_utils.properties.get(
hdf5_file, device_name, "device_properties")
pulse_program = group['pulse_program'][()]
# configure clock from device properties
ext = self.device_properties['external_clock']
freq = self.device_properties['clock_frequency']
self.intf.send_command_ok(f"clk {ext:d} {freq:.0f}")
# check if it is more efficient to fully refresh
if not fresh and self.smart_cache['pulse_program'] is not None:
# get more convenient handle to smart cache array
curr_program = self.smart_cache['pulse_program']
# if arrays aren't of same shape, only compare up to smaller array size
n_curr = len(curr_program)
n_new = len(pulse_program)
if n_curr > n_new:
# technically don't need to reprogram current elements beyond end of new elements
new_inst = np.sum(curr_program[:n_new] != pulse_program)
elif n_curr < n_new:
n_diff = n_new - n_curr
val_diffs = np.sum(curr_program != pulse_program[:n_curr])
new_inst = val_diffs + n_diff
else:
new_inst = np.sum(curr_program != pulse_program)
if new_inst / n_new > 0.1:
fresh = True
# if fresh or not smart cache, program full table as a batch
# this is faster than going line by line
if fresh or self.smart_cache['pulse_program'] is None:
self.intf.send_command_ok('cls') # clear old program
self.intf.adm_batch(pulse_program)
self.smart_cache['pulse_program'] = pulse_program
else:
# only program table lines that have changed
n_cache = len(self.smart_cache['pulse_program'])
for i, instr in enumerate(pulse_program):
if i >= n_cache:
print(f'programming step {i}')
self.intf.send_command_ok(f'set {i:x} {instr[0]:x} {instr[1]:x}')
self.smart_cache['pulse_program'][i] = instr
elif (self.smart_cache['pulse_program'][i] != instr):
print(f'programming step {i}')
self.intf.send_command_ok(f'set {i:x} {instr[0]:x} {instr[1]:x}')
self.smart_cache['pulse_program'][i] = instr
final_values = self._int_to_dict(pulse_program[-1][0])
# start program, waiting for beginning trigger from parent
self.intf.send_command_ok('run')
return final_values
[docs]
def transition_to_manual(self):
"""Transition to manual mode after buffered execution completion.
Returns:
bool: `True` if transition to manual is successful.
"""
i = 0
while True:
run_status, _ = self.intf.status()
i += 1
if run_status == 0:
return True
elif i == 1000:
# program hasn't ended, probably bad triggering
# abort and raise an error
self.abort_buffered()
raise LabscriptError(f'PrawnDO did not end with status {run_status:d}. Is triggering working?')
elif run_status in [3,4,5]:
raise LabscriptError(f'PrawnDO returned status {run_status} in transition to manual')
[docs]
def abort_buffered(self):
"""Aborts a currently running buffered execution.
Returns:
bool: `True` is abort was successful.
"""
self.intf.send_command_ok('abt')
# loop until abort complete
while self.intf.status()[0] != 5:
time.sleep(0.5)
return True
[docs]
def abort_transition_to_buffered(self):
"""Aborts transition to buffered.
Calls :meth:`abort_buffered`
"""
return self.abort_buffered()
[docs]
def shutdown(self):
"""Closes serial connection to PrawnDO"""
self.intf.close()