Source code for labscript_devices.NI_DAQmx.runviewer_parsers

import labscript_utils.h5_lock
import h5py
import numpy as np

import as properties
from labscript_utils import dedent

[docs]class NI_DAQmxParser(object): def __init__(self, path, device): self.path = path = self.device = device
[docs] def get_traces(self, add_trace, clock=None): with h5py.File(self.path, 'r') as f: group = f['devices/' +] if 'AO' in group: AO_table = group['AO'][:] else: AO_table = None if 'DO' in f['devices/%s' %]: DO_table = group['DO'][:] else: DO_table = None props = properties.get(f,, 'connection_table_properties') version = props.get('__version__', None) if version is None: msg = """Shot was compiled with the old version of the NI_DAQmx device class. The new runviewer parser is not backward compatible with old shot files. Either downgrade labscript_devices to 2.2.0 or less, or recompile the shot with labscript_devices 2.3.0 or greater.""" raise RuntimeError(dedent(msg)) ports = props['ports'] static_AO = props['static_AO'] static_DO = props['static_DO'] times, clock_value = clock[0], clock[1] clock_indices = np.where((clock_value[1:] - clock_value[:-1]) == 1)[0] + 1 # If initial clock value is 1, then this counts as a rising edge (clock should # be 0 before experiment) but this is not picked up by the above code. So we # insert it! if clock_value[0] == 1: clock_indices = np.insert(clock_indices, 0, 0) clock_ticks = times[clock_indices] traces = {} if DO_table is not None: ports_in_use = DO_table.dtype.names for port_str in ports_in_use: for line in range(ports[port_str]["num_lines"]): # Extract each digital value from the packed bits: line_vals = (((1 << line) & DO_table[port_str]) != 0).astype(float) if static_DO: line_vals = np.full(len(clock_ticks), line_vals[0]) traces['%s/line%d' % (port_str, line)] = (clock_ticks, line_vals) if AO_table is not None: for chan in AO_table.dtype.names: vals = AO_table[chan] if static_AO: vals = np.full(len(clock_ticks), vals[0]) traces[chan] = (clock_ticks, vals) triggers = {} for channel_name, channel in self.device.child_list.items(): if channel.parent_port in traces: trace = traces[channel.parent_port] if channel.device_class == 'Trigger': triggers[channel_name] = trace add_trace(channel_name, trace,, channel.parent_port) return triggers