Source code for labscript_devices.PineBlaster

#####################################################################
#                                                                   #
# /PineBlaster.py                                                   #
#                                                                   #
# Copyright 2013, Monash University                                 #
#                                                                   #
# This file is part of the module labscript_devices, in the         #
# labscript suite (see http://labscriptsuite.org), and is           #
# licensed under the Simplified BSD License. See the license.txt    #
# file in the root of the project for the full license.             #
#                                                                   #
#####################################################################

from labscript import PseudoclockDevice, Pseudoclock, ClockLine, config, LabscriptError, set_passed_properties
from labscript_devices import runviewer_parser, BLACS_tab

import numpy as np
import labscript_utils.h5_lock, h5py
import labscript_utils.properties



# Define a PineBlasterPseudoClock that only accepts one child clockline
[docs] class PineBlasterPseudoclock(Pseudoclock):
[docs] def add_device(self, device): if isinstance(device, ClockLine): # only allow one child if self.child_devices: raise LabscriptError('The pseudoclock of the PineBlaster %s only supports 1 clockline, which is automatically created. Please use the clockline located at %s.clockline'%(self.parent_device.name, self.parent_device.name)) Pseudoclock.add_device(self, device) else: raise LabscriptError('You have connected %s to %s (the Pseudoclock of %s), but %s only supports children that are ClockLines. Please connect your device to %s.clockline instead.'%(device.name, self.name, self.parent_device.name, self.name, self.parent_device.name))
[docs] class PineBlaster(PseudoclockDevice): description = 'PineBlaster' clock_limit = 10e6 clock_resolution = 25e-9 clock_type = 'fast clock' # Measured by Phil Starkey on 2015/9/24 trigger_delay = 350e-9 # Todo: find out what this actually is: wait_delay = 2.5e-6 allowed_children = [PineBlasterPseudoclock] max_instructions = 15000 @set_passed_properties(property_names = { "connection_table_properties": ["usbport"]} ) def __init__(self, name, trigger_device=None, trigger_connection=None, usbport='COM1'): PseudoclockDevice.__init__(self, name, trigger_device, trigger_connection) self.BLACS_connection = usbport # create Pseudoclock and clockline self._pseudoclock = PineBlasterPseudoclock('%s_pseudoclock'%name, self, 'clock') # possibly a better connection name than 'clock'? # Create the internal direct output clock_line self._clock_line = ClockLine('%s_clock_line'%name, self.pseudoclock, 'internal') @property def pseudoclock(self): return self._pseudoclock # Note, not to be confused with Device.parent_clock_line which returns the parent ClockLine # This one gives the automatically created ClockLine object @property def clockline(self): return self._clock_line
[docs] def add_device(self, device): if not self.child_devices and isinstance(device, Pseudoclock): PseudoclockDevice.add_device(self, device) elif isinstance(device, Pseudoclock): raise LabscriptError('The %s %s automatically creates a Pseudoclock because it only supports one. '%(self.description, self.name) + 'Instead of instantiating your own Pseudoclock object, please use the internal' + ' one stored in %s.pseudoclock'%self.name) else: raise LabscriptError('You have connected %s (class %s) to %s, but %s does not support children with that class.'%(device.name, device.__class__, self.name, self.name))
[docs] def generate_code(self, hdf5_file): PseudoclockDevice.generate_code(self, hdf5_file) group = hdf5_file['devices'].create_group(self.name) # compress clock instructions with the same period: This will # halve the number of instructions roughly, since the PineBlaster # does not have a 'slow clock': reduced_instructions = [] for instruction in self.pseudoclock.clock: if instruction == 'WAIT': # The following period and reps indicates a wait instruction reduced_instructions.append({'period': 0, 'reps': 1}) continue reps = instruction['reps'] # period is in quantised units: period = int(round(instruction['step']/self.clock_resolution)) if reduced_instructions and reduced_instructions[-1]['period'] == period: reduced_instructions[-1]['reps'] += reps else: reduced_instructions.append({'period': period, 'reps': reps}) # The following period and reps indicates a stop instruction: reduced_instructions.append({'period': 0, 'reps': 0}) if len(reduced_instructions) > self.max_instructions: raise LabscriptError("%s %s has too many instructions. It has %d and can only support %d"%(self.description, self.name, len(reduced_instructions), self.max_instructions)) # Store these instructions to the h5 file: dtypes = [('period',int),('reps',int)] pulse_program = np.zeros(len(reduced_instructions),dtype=dtypes) for i, instruction in enumerate(reduced_instructions): pulse_program[i]['period'] = instruction['period'] pulse_program[i]['reps'] = instruction['reps'] group.create_dataset('PULSE_PROGRAM', compression = config.compression, data=pulse_program) # TODO: is this needed, the PulseBlasters don't save it... self.set_property('is_master_pseudoclock', self.is_master_pseudoclock, location='device_properties') self.set_property('stop_time', self.stop_time, location='device_properties')
[docs] @runviewer_parser class RunviewerClass(object): clock_resolution = 25e-9 clock_type = 'fast clock' # Todo: find out what this actually is: trigger_delay = 1e-6 # Todo: find out what this actually is: wait_delay = 2.5e-6 def __init__(self, path, device): self.path = path self.name = device.name self.device = device
[docs] def get_traces(self, add_trace, clock=None): if clock is not None: times, clock_value = clock[0], clock[1] clock_indices = np.where((clock_value[1:]-clock_value[:-1])==1)[0]+1 # If initial clock value is 1, then this counts as a rising edge (clock should be 0 before experiment) # but this is not picked up by the above code. So we insert it! if clock_value[0] == 1: clock_indices = np.insert(clock_indices, 0, 0) clock_ticks = times[clock_indices] # get the pulse program with h5py.File(self.path, 'r') as f: pulse_program = f['devices/%s/PULSE_PROGRAM'%self.name][:] time = [] states = [] trigger_index = 0 t = 0 if clock is None else clock_ticks[trigger_index]+self.trigger_delay trigger_index += 1 clock_factor = self.clock_resolution/2. for row in pulse_program: if row['period'] == 0: #special case if row['reps'] == 1: # WAIT if clock is not None: t = clock_ticks[trigger_index]+self.trigger_delay trigger_index += 1 else: t += self.wait_delay else: for i in range(row['reps']): for j in range(1, -1, -1): time.append(t) states.append(j) t += row['period']*clock_factor clock = (np.array(time), np.array(states)) clocklines_and_triggers = {} for pseudoclock_name, pseudoclock in self.device.child_list.items(): for clock_line_name, clock_line in pseudoclock.child_list.items(): if clock_line.parent_port == 'internal': clocklines_and_triggers[clock_line_name] = clock add_trace(clock_line_name, clock, self.name, clock_line.parent_port) return clocklines_and_triggers
from blacs.tab_base_classes import Worker, define_state from blacs.tab_base_classes import MODE_MANUAL, MODE_TRANSITION_TO_BUFFERED, MODE_TRANSITION_TO_MANUAL, MODE_BUFFERED from blacs.device_base_class import DeviceTab
[docs] @BLACS_tab class PineblasterTab(DeviceTab):
[docs] def initialise_GUI(self): # Create a single digital output self.create_digital_outputs({'internal':{}}) # Create widgets for output objects _,_,do_widgets = self.auto_create_widgets() # and auto place the widgets in the UI self.auto_place_widgets(("Flags", do_widgets)) # Store the board number to be used self.usb_port = str(self.settings['connection_table'].find_by_name(self.device_name).BLACS_connection) # Create and set the primary worker self.create_worker("main_worker", PineblasterWorker, {'usbport':self.usb_port}) self.primary_worker = "main_worker" # Set the capabilities of this device self.supports_smart_programming(True)
[docs] def get_child_from_connection_table(self, parent_device_name, port): # This is a direct output, let's search for it on the internal Pseudoclock if parent_device_name == self.device_name: device = self.connection_table.find_by_name(self.device_name) pseudoclock = device.child_list[list(device.child_list.keys())[0]] # there should always be one (and only one) child, the Pseudoclock clockline = None for child_name, child in pseudoclock.child_list.items(): # store a reference to the internal clockline if child.parent_port == port: return DeviceTab.get_child_from_connection_table(self, pseudoclock.name, port) return '-'
@define_state(MODE_BUFFERED,True) def status_monitor(self, notify_queue): status = yield(self.queue_work(self.primary_worker, 'status_monitor')) if status: # Experiment is over. Tell the queue manager about it notify_queue.put('done') self.statemachine_timeout_remove(self.status_monitor) @define_state(MODE_BUFFERED,True) def start_run(self, notify_queue): """Starts the Pineblaster, notifying the queue manager when the run is over""" self.statemachine_timeout_add(100, self.status_monitor, notify_queue) yield(self.queue_work(self.primary_worker, 'start_run'))
[docs] class PineblasterWorker(Worker):
[docs] def init(self): global h5py; import labscript_utils.h5_lock, h5py global serial; import serial global time; import time self.smart_cache = [] self.pineblaster = serial.Serial(self.usbport, 115200, timeout=1) # Device has a finite startup time: time.sleep(5) self.pineblaster.write(b'hello\r\n') response = self.pineblaster.readline().decode() if response == 'hello\r\n': return elif response: raise Exception('PineBlaster is confused: saying %s instead of hello'%(repr(response))) else: raise Exception('PineBlaster is not saying hello back when greeted politely. How rude. Maybe it needs a reboot.')
[docs] def shutdown(self): self.pineblaster.close()
[docs] def program_manual(self, values): value = values['internal'] # there is only one value self.pineblaster.write(b'go high\r\n' if value else b'go low\r\n') response = self.pineblaster.readline().decode() assert response == 'ok\r\n', 'PineBlaster said \'%s\', expected \'ok\''%repr(response) return {}
[docs] def transition_to_buffered(self, device_name, h5file, initial_values, fresh): if fresh: self.smart_cache = [] self.program_manual({'internal':0}) with h5py.File(h5file,'r') as hdf5_file: group = hdf5_file['devices/%s'%device_name] pulse_program = group['PULSE_PROGRAM'][:] device_properties = labscript_utils.properties.get(hdf5_file, device_name, 'device_properties') self.is_master_pseudoclock = device_properties['is_master_pseudoclock'] for i, instruction in enumerate(pulse_program): if i == len(self.smart_cache): # Pad the smart cache out to be as long as the program: self.smart_cache.append(None) # Only program instructions that differ from what's in the smart cache: if self.smart_cache[i] != instruction: self.pineblaster.write(b'set %d %d %d\r\n'%(i, instruction['period'], instruction['reps'])) response = self.pineblaster.readline().decode() assert response == 'ok\r\n', 'PineBlaster said \'%s\', expected \'ok\''%repr(response) self.smart_cache[i] = instruction if not self.is_master_pseudoclock: # Get ready for a hardware trigger: self.pineblaster.write(b'hwstart\r\n') response = self.pineblaster.readline().decode() assert response == 'ok\r\n', 'PineBlaster said \'%s\', expected \'ok\''%repr(response) return {'internal':0} # always finish on 0
[docs] def start_run(self): # Start in software: self.pineblaster.write(b'start\r\n') response = self.pineblaster.readline().decode() assert response == 'ok\r\n', 'PineBlaster said \'%s\', expected \'ok\''%repr(response)
[docs] def status_monitor(self): # Wait to see if it's done within the timeout: response = self.pineblaster.readline().decode() if response: assert response == 'done\r\n' return True return False
[docs] def transition_to_manual(self): # Wait until the pineblaster says it's done: if not self.is_master_pseudoclock: # If we're the master pseudoclock then this already happened # in status_monitor, so we don't need to do it again response = self.pineblaster.readline().decode() assert response == 'done\r\n', 'PineBlaster said \'%s\', expected \'ok\''%repr(response) # print 'done!' return True
[docs] def abort_buffered(self): return self.abort()
[docs] def abort_transition_to_buffered(self): return self.abort()
[docs] def abort(self): self.pineblaster.write(b'restart\r\n') time.sleep(5) self.shutdown() self.init() return True