Source code for labscript_devices.PulseBlaster_No_DDS

#####################################################################
#                                                                   #
# /Pulseblaster_No_DDS.py                                           #
#                                                                   #
# Copyright 2013, Monash University                                 #
#                                                                   #
# This file is part of labscript_devices, in the labscript suite    #
# (see http://labscriptsuite.org), and is licensed under the        #
# Simplified BSD License. See the license.txt file in the root of   #
# the project for the full license.                                 #
#                                                                   #
#####################################################################

from labscript_devices import BLACS_tab, runviewer_parser
from labscript_devices.PulseBlaster import PulseBlaster, PulseBlasterParser
from labscript import PseudoclockDevice, config

import numpy as np


[docs] class PulseBlaster_No_DDS(PulseBlaster): description = 'generic DO only Pulseblaster' clock_limit = 8.3e6 # can probably go faster clock_resolution = 20e-9 n_flags = 24 core_clock_freq = 100 # MHz
[docs] def write_pb_inst_to_h5(self, pb_inst, hdf5_file): # OK now we squeeze the instructions into a numpy array ready for writing to hdf5: pb_dtype= [('flags',np.int32), ('inst',np.int32), ('inst_data',np.int32), ('length',np.float64)] pb_inst_table = np.empty(len(pb_inst),dtype = pb_dtype) for i,inst in enumerate(pb_inst): flagint = int(inst['flags'][::-1],2) instructionint = self.pb_instructions[inst['instruction']] dataint = inst['data'] delaydouble = inst['delay'] pb_inst_table[i] = (flagint, instructionint, dataint, delaydouble) # Okay now write it to the file: group = hdf5_file['/devices/'+self.name] group.create_dataset('PULSE_PROGRAM', compression=config.compression,data = pb_inst_table) self.set_property('stop_time', self.stop_time, location='device_properties')
[docs] def generate_code(self, hdf5_file): # Generate the hardware instructions self.init_device_group(hdf5_file) PseudoclockDevice.generate_code(self, hdf5_file) dig_outputs, ignore = self.get_direct_outputs() pb_inst = self.convert_to_pb_inst(dig_outputs, [], {}, {}, {}) self._check_wait_monitor_ok() self.write_pb_inst_to_h5(pb_inst, hdf5_file)
from blacs.tab_base_classes import Worker, define_state from blacs.tab_base_classes import MODE_MANUAL, MODE_TRANSITION_TO_BUFFERED, MODE_TRANSITION_TO_MANUAL, MODE_BUFFERED from blacs.device_base_class import DeviceTab from qtutils import UiLoader import qtutils.icons import os # We can't import * from QtCore & QtGui, as one of them has a function called bin() which overrides the builtin, which is used in the pulseblaster worker from qtutils.qt import QtCore from qtutils.qt import QtGui from qtutils.qt import QtWidgets
[docs] @BLACS_tab class Pulseblaster_No_DDS_Tab(DeviceTab): # Capabilities num_DO = 24 def __init__(self,*args,**kwargs): if not hasattr(self,'device_worker_class'): self.device_worker_class = PulseblasterNoDDSWorker DeviceTab.__init__(self,*args,**kwargs)
[docs] def initialise_GUI(self): do_prop = {} for i in range(self.num_DO): # 12 is the maximum number of flags on this device (some only have 4 though) do_prop['flag %d'%i] = {} # Create the output objects self.create_digital_outputs(do_prop) # Create widgets for output objects dds_widgets,ao_widgets,do_widgets = self.auto_create_widgets() # Define the sort function for the digital outputs def sort(channel): flag = channel.replace('flag ','') flag = int(flag) return '%02d'%(flag) # and auto place the widgets in the UI self.auto_place_widgets(("Flags",do_widgets,sort)) # Store the board number to be used connection_object = self.settings['connection_table'].find_by_name(self.device_name) self.board_number = int(connection_object.BLACS_connection) # And which scheme we're using for buffered output programming and triggering: # (default values for backward compat with old connection tables) self.programming_scheme = connection_object.properties.get('programming_scheme', 'pb_start/BRANCH') # Create and set the primary worker self.create_worker("main_worker",self.device_worker_class,{'board_number':self.board_number, 'num_DO': self.num_DO, 'programming_scheme': self.programming_scheme}) self.primary_worker = "main_worker" # Set the capabilities of this device self.supports_smart_programming(True) #### adding status widgets from PulseBlaster.py # Load status monitor (and start/stop/reset buttons) UI ui = UiLoader().load(os.path.join(os.path.dirname(os.path.realpath(__file__)),'pulseblaster.ui')) self.get_tab_layout().addWidget(ui) # Connect signals for buttons ui.start_button.clicked.connect(self.start) ui.stop_button.clicked.connect(self.stop) ui.reset_button.clicked.connect(self.reset) # Add icons ui.start_button.setIcon(QtGui.QIcon(':/qtutils/fugue/control')) ui.start_button.setToolTip('Start') ui.stop_button.setIcon(QtGui.QIcon(':/qtutils/fugue/control-stop-square')) ui.stop_button.setToolTip('Stop') ui.reset_button.setIcon(QtGui.QIcon(':/qtutils/fugue/arrow-circle')) ui.reset_button.setToolTip('Reset') # initialise dictionaries of data to display and get references to the QLabels self.status_states = ['stopped', 'reset', 'running', 'waiting'] self.status = {} self.status_widgets = {} for state in self.status_states: self.status[state] = False self.status_widgets[state] = getattr(ui,'%s_label'%state) # Status monitor timout self.statemachine_timeout_add(2000, self.status_monitor)
[docs] def get_child_from_connection_table(self, parent_device_name, port): # This is a direct output, let's search for it on the internal intermediate device called # PulseBlasterDirectOutputs if parent_device_name == self.device_name: device = self.connection_table.find_by_name(self.device_name) pseudoclock = device.child_list[list(device.child_list.keys())[0]] # there should always be one (and only one) child, the Pseudoclock clockline = None for child_name, child in pseudoclock.child_list.items(): # store a reference to the internal clockline if child.parent_port == 'internal': clockline = child # if the port is in use by a clockline, return the clockline elif child.parent_port == port: return child if clockline is not None: # There should only be one child of this clock line, the direct outputs direct_outputs = clockline.child_list[list(clockline.child_list.keys())[0]] # look to see if the port is used by a child of the direct outputs return DeviceTab.get_child_from_connection_table(self, direct_outputs.name, port) else: return '' else: # else it's a child of a DDS, so we can use the default behaviour to find the device return DeviceTab.get_child_from_connection_table(self, parent_device_name, port)
# This function gets the status of the Pulseblaster from the spinapi, # and updates the front panel widgets! @define_state(MODE_MANUAL|MODE_BUFFERED|MODE_TRANSITION_TO_BUFFERED|MODE_TRANSITION_TO_MANUAL,True) def status_monitor(self,notify_queue=None): # When called with a queue, this function writes to the queue # when the pulseblaster is waiting. This indicates the end of # an experimental run. self.status, waits_pending, time_based_shot_over = yield(self.queue_work(self._primary_worker,'check_status')) if self.programming_scheme == 'pb_start/BRANCH': done_condition = self.status['waiting'] elif self.programming_scheme == 'pb_stop_programming/STOP': done_condition = self.status['stopped'] if time_based_shot_over is not None: done_condition = time_based_shot_over if notify_queue is not None and done_condition and not waits_pending: # Experiment is over. Tell the queue manager about it, then # set the status checking timeout back to every 2 seconds # with no queue. notify_queue.put('done') self.statemachine_timeout_remove(self.status_monitor) self.statemachine_timeout_add(2000,self.status_monitor) if self.programming_scheme == 'pb_stop_programming/STOP': # Not clear that on all models the outputs will be correct after being # stopped this way, so we do program_manual with current values to be sure: self.program_device() # Update widgets with new status for state in self.status_states: if self.status[state]: icon = QtGui.QIcon(':/qtutils/fugue/tick') else: icon = QtGui.QIcon(':/qtutils/fugue/cross') pixmap = icon.pixmap(QtCore.QSize(16, 16)) self.status_widgets[state].setPixmap(pixmap) @define_state(MODE_MANUAL|MODE_BUFFERED|MODE_TRANSITION_TO_BUFFERED|MODE_TRANSITION_TO_MANUAL,True) def start(self,widget=None): yield(self.queue_work(self._primary_worker,'start_run')) self.status_monitor() @define_state(MODE_MANUAL|MODE_BUFFERED|MODE_TRANSITION_TO_BUFFERED|MODE_TRANSITION_TO_MANUAL,True) def stop(self,widget=None): yield(self.queue_work(self._primary_worker,'pb_stop')) self.status_monitor() @define_state(MODE_MANUAL|MODE_BUFFERED|MODE_TRANSITION_TO_BUFFERED|MODE_TRANSITION_TO_MANUAL,True) def reset(self,widget=None): yield(self.queue_work(self._primary_worker,'pb_reset')) self.status_monitor() @define_state(MODE_BUFFERED,True) def start_run(self, notify_queue): """Starts the Pulseblaster, notifying the queue manager when the run is over""" self.statemachine_timeout_remove(self.status_monitor) self.start() self.statemachine_timeout_add(100,self.status_monitor,notify_queue)
[docs] class PulseblasterNoDDSWorker(Worker): core_clock_freq = 100
[docs] def init(self): exec('from spinapi import *', globals()) global h5py; import labscript_utils.h5_lock, h5py global zprocess; import zprocess self.pb_start = pb_start self.pb_stop = pb_stop self.pb_reset = pb_reset self.pb_close = pb_close self.pb_read_status = pb_read_status self.smart_cache = {'pulse_program':None,'ready_to_go':False, 'initial_values':None} # An event for checking when all waits (if any) have completed, so that # we can tell the difference between a wait and the end of an experiment. # The wait monitor device is expected to post such events, which we'll wait on: self.all_waits_finished = zprocess.Event('all_waits_finished') self.waits_pending = False pb_select_board(self.board_number) pb_init() pb_core_clock(self.core_clock_freq) # This is only set to True on a per-shot basis, so set it to False # for manual mode. Set associated attributes to None: self.time_based_stop_workaround = False self.time_based_shot_duration = None self.time_based_shot_end_time = None
[docs] def program_manual(self,values): # Program the DDS registers: # create flags string # NOTE: The spinapi can take a string or integer for flags. # If it is a string: # flag: 0 12 # '101100011111' # # If it is a binary number: # flag:12 0 # 0b111110001101 # # Be warned! flags = '' for i in range(self.num_DO): if values['flag %d'%i]: flags += '1' else: flags += '0' if self.programming_scheme == 'pb_stop_programming/STOP': # Need to ensure device is stopped before programming - or we won't know what line it's on. pb_stop() # Write the first two lines of the pulse program: pb_start_programming(PULSE_PROGRAM) # Line zero is a wait: pb_inst_pbonly(flags, WAIT, 0, 100) # Line one is a brach to line 0: pb_inst_pbonly(flags, BRANCH, 0, 100) pb_stop_programming() # Now we're waiting on line zero, so when we start() we'll go to # line one, then brach back to zero, completing the static update: pb_start() # The pulse program now has a branch in line one, and so can't proceed to the pulse program # without a reprogramming of the first two lines: self.smart_cache['ready_to_go'] = False # TODO: return coerced/quantised values return {}
[docs] def start_run(self): if self.programming_scheme == 'pb_start/BRANCH': pb_start() elif self.programming_scheme == 'pb_stop_programming/STOP': pb_stop_programming() pb_start() else: raise ValueError('invalid programming_scheme: %s'%str(self.programming_scheme)) if self.time_based_stop_workaround: import time self.time_based_shot_end_time = time.time() + self.time_based_shot_duration
[docs] def transition_to_buffered(self,device_name,h5file,initial_values,fresh): self.h5file = h5file if self.programming_scheme == 'pb_stop_programming/STOP': # Need to ensure device is stopped before programming - or we wont know what line it's on. pb_stop() with h5py.File(h5file,'r') as hdf5_file: group = hdf5_file['devices/%s'%device_name] # Is this shot using the fixed-duration workaround instead of checking the PulseBlaster's status? self.time_based_stop_workaround = group.attrs.get('time_based_stop_workaround', False) if self.time_based_stop_workaround: self.time_based_shot_duration = (group.attrs['stop_time'] + hdf5_file['waits'][:]['timeout'].sum() + group.attrs['time_based_stop_workaround_extra_time']) # Now for the pulse program: pulse_program = group['PULSE_PROGRAM'][2:] #Let's get the final state of the pulseblaster. z's are the args we don't need: flags,z,z,z = pulse_program[-1] if fresh or (self.smart_cache['initial_values'] != initial_values) or \ (len(self.smart_cache['pulse_program']) != len(pulse_program)) or \ (self.smart_cache['pulse_program'] != pulse_program).any() or \ not self.smart_cache['ready_to_go']: # Enter programming mode pb_start_programming(PULSE_PROGRAM) self.smart_cache['ready_to_go'] = True self.smart_cache['initial_values'] = initial_values # create initial flags string # NOTE: The spinapi can take a string or integer for flags. # If it is a string: # flag: 0 12 # '101100011111' # # If it is a binary number: # flag:12 0 # 0b111110001101 # # Be warned! initial_flags = '' for i in range(self.num_DO): if initial_values['flag %d'%i]: initial_flags += '1' else: initial_flags += '0' if self.programming_scheme == 'pb_start/BRANCH': # Line zero is a wait on the final state of the program in 'pb_start/BRANCH' mode pb_inst_pbonly(flags,WAIT,0,100) else: # Line zero otherwise just contains the initial flags pb_inst_pbonly(initial_flags,CONTINUE,0,100) # Line one is a continue with the current front panel values: pb_inst_pbonly(initial_flags, CONTINUE, 0, 100) # Now the rest of the program: if fresh or len(self.smart_cache['pulse_program']) != len(pulse_program) or \ (self.smart_cache['pulse_program'] != pulse_program).any(): self.smart_cache['pulse_program'] = pulse_program for args in pulse_program: pb_inst_pbonly(*args) if self.programming_scheme == 'pb_start/BRANCH': # We will be triggered by pb_start() if we are are the master pseudoclock or a single hardware trigger # from the master if we are not: pb_stop_programming() elif self.programming_scheme == 'pb_stop_programming/STOP': # Don't call pb_stop_programming(). We don't want to pulseblaster to respond to hardware # triggers (such as 50/60Hz line triggers) until we are ready to run. # Our start_method will call pb_stop_programming() when we are ready pass else: raise ValueError('invalid programming_scheme %s'%str(self.programming_scheme)) elif self.programming_scheme == 'pb_stop_programming/STOP': # Ensure start_programming called if the programming_scheme is 'pb_stop_programming/STOP' # so we are ready to be triggered by a call to pb_stop_programming() # even if no programming occurred due to smart programming: pb_start_programming(PULSE_PROGRAM) # Are there waits in use in this experiment? The monitor waiting for the end # of the experiment will need to know: wait_monitor_exists = bool(hdf5_file['waits'].attrs['wait_monitor_acquisition_device']) waits_in_use = bool(len(hdf5_file['waits'])) self.waits_pending = wait_monitor_exists and waits_in_use if waits_in_use and not wait_monitor_exists: # This should be caught during labscript compilation, but just in case. # having waits but not a wait monitor means we can't tell when the shot # is over unless the shot ends in a STOP instruction: assert self.programming_scheme == 'pb_stop_programming/STOP' # Now we build a dictionary of the final state to send back to the GUI: return_values = {} # Since we are converting from an integer to a binary string, we need to reverse the string! (see notes above when we create flags variables) return_flags = str(bin(flags)[2:]).rjust(self.num_DO,'0')[::-1] for i in range(self.num_DO): return_values['flag %d'%i] = return_flags[i] return return_values
[docs] def check_status(self): if self.waits_pending: try: self.all_waits_finished.wait(self.h5file, timeout=0) self.waits_pending = False except zprocess.TimeoutError: pass if self.time_based_shot_end_time is not None: import time time_based_shot_over = time.time() > self.time_based_shot_end_time else: time_based_shot_over = None return pb_read_status(), self.waits_pending, time_based_shot_over
[docs] def transition_to_manual(self): status, waits_pending, time_based_shot_over = self.check_status() if self.programming_scheme == 'pb_start/BRANCH': done_condition = status['waiting'] elif self.programming_scheme == 'pb_stop_programming/STOP': done_condition = status['stopped'] if time_based_shot_over is not None: done_condition = time_based_shot_over # This is only set to True on a per-shot basis, so reset it to False # for manual mode. Reset associated attributes to None: self.time_based_stop_workaround = False self.time_based_shot_duration = None self.time_based_shot_end_time = None if done_condition and not waits_pending: return True else: return False
[docs] def abort_buffered(self): # Stop the execution self.pb_stop() # Reset to the beginning of the pulse sequence self.pb_reset() # abort_buffered in the GUI process queues up a program_device state # which will reprogram the device and call pb_start() # This ensures the device isn't accidentally retriggered by another device # while it is running it's abort function return True
[docs] def abort_transition_to_buffered(self): return True
[docs] def shutdown(self): #TODO: implement this pass
[docs] @runviewer_parser class PulseBlaster_No_DDS_Parser(PulseBlasterParser): num_dds = 0 num_flags = 24