Source code for labscript_devices.PrawnBlaster.runviewer_parsers

#####################################################################
#                                                                   #
# /labscript_devices/PrawnBlaster/runviewer_parsers.py              #
#                                                                   #
# Copyright 2021, Philip Starkey                                    #
#                                                                   #
# This file is part of labscript_devices, in the labscript suite    #
# (see http://labscriptsuite.org), and is licensed under the        #
# Simplified BSD License. See the license.txt file in the root of   #
# the project for the full license.                                 #
#                                                                   #
#####################################################################

import labscript_utils.h5_lock  # noqa: F401
import h5py
import numpy as np

import labscript_utils.properties as properties


[docs] class PrawnBlasterParser(object): """Runviewer parser for the PrawnBlaster Pseudoclocks.""" def __init__(self, path, device): """ Args: path (str): path to h5 shot file device (str): labscript name of PrawnBlaster device """ self.path = path self.name = device.name self.device = device
[docs] def get_traces(self, add_trace, clock=None): """Reads the shot file and extracts hardware instructions to produce runviewer traces. Args: add_trace (func): function handle that adds traces to runviewer clock (tuple, optional): clock times from timing device, if not the primary pseudoclock Returns: dict: Dictionary of clocklines and triggers derived from instructions """ if clock is not None: times, clock_value = clock[0], clock[1] clock_indices = np.where((clock_value[1:] - clock_value[:-1]) == 1)[0] + 1 # If initial clock value is 1, then this counts as a rising edge # (clock should be 0 before experiment) but this is not picked up # by the above code. So we insert it! if clock_value[0] == 1: clock_indices = np.insert(clock_indices, 0, 0) clock_ticks = times[clock_indices] # get the pulse program pulse_programs = [] with h5py.File(self.path, "r") as f: # Get the device properties device_props = properties.get(f, self.name, "device_properties") conn_props = properties.get(f, self.name, "connection_table_properties") self.clock_resolution = device_props["clock_resolution"] self.trigger_delay = device_props["trigger_delay"] self.wait_delay = device_props["wait_delay"] # Extract the pulse programs num_pseudoclocks = conn_props["num_pseudoclocks"] for i in range(num_pseudoclocks): pulse_programs.append(f[f"devices/{self.name}/PULSE_PROGRAM_{i}"][:]) # Generate clocklines and triggers clocklines_and_triggers = {} for pseudoclock_name, pseudoclock in self.device.child_list.items(): # Get pseudoclock index connection_parts = pseudoclock.parent_port.split() # Skip if not one of the 4 possible pseudoclock outputs (there is one for # the wait monitor too potentially) if connection_parts[0] != "pseudoclock": continue # Get the pulse program index = int(connection_parts[1]) pulse_program = pulse_programs[index] time = [] states = [] trigger_index = 0 t = 0 if clock is None else clock_ticks[trigger_index] + self.trigger_delay trigger_index += 1 clock_factor = self.clock_resolution / 2.0 last_instruction_was_wait = False for row in pulse_program: if row["reps"] == 0 and not last_instruction_was_wait: # WAIT last_instruction_was_wait = True if clock is not None: t = clock_ticks[trigger_index] + self.trigger_delay trigger_index += 1 else: t += self.wait_delay elif last_instruction_was_wait: # two waits in a row means an indefinite wait, so we just skip this # instruction. last_instruction_was_wait = False continue else: last_instruction_was_wait = False for i in range(row["reps"]): for j in range(1, -1, -1): time.append(t) states.append(j) t += row["half_period"] * clock_factor pseudoclock_clock = (np.array(time), np.array(states)) for clock_line_name, clock_line in pseudoclock.child_list.items(): # Ignore the dummy internal wait monitor clockline if clock_line.parent_port.startswith("GPIO"): clocklines_and_triggers[clock_line_name] = pseudoclock_clock add_trace( clock_line_name, pseudoclock_clock, self.name, clock_line.parent_port ) return clocklines_and_triggers