#####################################################################
# #
# /labscript_devices/PrawnBlaster/runviewer_parsers.py #
# #
# Copyright 2021, Philip Starkey #
# #
# This file is part of labscript_devices, in the labscript suite #
# (see http://labscriptsuite.org), and is licensed under the #
# Simplified BSD License. See the license.txt file in the root of #
# the project for the full license. #
# #
#####################################################################
import labscript_utils.h5_lock # noqa: F401
import h5py
import numpy as np
import labscript_utils.properties as properties
[docs]
class PrawnBlasterParser(object):
"""Runviewer parser for the PrawnBlaster Pseudoclocks."""
def __init__(self, path, device):
"""
Args:
path (str): path to h5 shot file
device (str): labscript name of PrawnBlaster device
"""
self.path = path
self.name = device.name
self.device = device
[docs]
def get_traces(self, add_trace, clock=None):
"""Reads the shot file and extracts hardware instructions to produce
runviewer traces.
Args:
add_trace (func): function handle that adds traces to runviewer
clock (tuple, optional): clock times from timing device, if not
the primary pseudoclock
Returns:
dict: Dictionary of clocklines and triggers derived from instructions
"""
if clock is not None:
times, clock_value = clock[0], clock[1]
clock_indices = np.where((clock_value[1:] - clock_value[:-1]) == 1)[0] + 1
# If initial clock value is 1, then this counts as a rising edge
# (clock should be 0 before experiment) but this is not picked up
# by the above code. So we insert it!
if clock_value[0] == 1:
clock_indices = np.insert(clock_indices, 0, 0)
clock_ticks = times[clock_indices]
# get the pulse program
pulse_programs = []
with h5py.File(self.path, "r") as f:
# Get the device properties
device_props = properties.get(f, self.name, "device_properties")
conn_props = properties.get(f, self.name, "connection_table_properties")
self.clock_resolution = device_props["clock_resolution"]
self.trigger_delay = device_props["trigger_delay"]
self.wait_delay = device_props["wait_delay"]
# Extract the pulse programs
num_pseudoclocks = conn_props["num_pseudoclocks"]
for i in range(num_pseudoclocks):
pulse_programs.append(f[f"devices/{self.name}/PULSE_PROGRAM_{i}"][:])
# Generate clocklines and triggers
clocklines_and_triggers = {}
for pseudoclock_name, pseudoclock in self.device.child_list.items():
# Get pseudoclock index
connection_parts = pseudoclock.parent_port.split()
# Skip if not one of the 4 possible pseudoclock outputs (there is one for
# the wait monitor too potentially)
if connection_parts[0] != "pseudoclock":
continue
# Get the pulse program
index = int(connection_parts[1])
pulse_program = pulse_programs[index]
time = []
states = []
trigger_index = 0
t = 0 if clock is None else clock_ticks[trigger_index] + self.trigger_delay
trigger_index += 1
clock_factor = self.clock_resolution / 2.0
last_instruction_was_wait = False
for row in pulse_program:
if row["reps"] == 0 and not last_instruction_was_wait: # WAIT
last_instruction_was_wait = True
if clock is not None:
t = clock_ticks[trigger_index] + self.trigger_delay
trigger_index += 1
else:
t += self.wait_delay
elif last_instruction_was_wait:
# two waits in a row means an indefinite wait, so we just skip this
# instruction.
last_instruction_was_wait = False
continue
else:
last_instruction_was_wait = False
for i in range(row["reps"]):
for j in range(1, -1, -1):
time.append(t)
states.append(j)
t += row["half_period"] * clock_factor
pseudoclock_clock = (np.array(time), np.array(states))
for clock_line_name, clock_line in pseudoclock.child_list.items():
# Ignore the dummy internal wait monitor clockline
if clock_line.parent_port.startswith("GPIO"):
clocklines_and_triggers[clock_line_name] = pseudoclock_clock
add_trace(
clock_line_name, pseudoclock_clock, self.name, clock_line.parent_port
)
return clocklines_and_triggers