#####################################################################
# #
# /NI_DAQmx/models/labscript_devices.py #
# #
# Copyright 2018, Monash University, JQI, Christopher Billington #
# #
# This file is part of the module labscript_devices, in the #
# labscript suite (see http://labscriptsuite.org), and is #
# licensed under the Simplified BSD License. See the license.txt #
# file in the root of the project for the full license. #
# #
#####################################################################
__version__ = '1.2.0'
from labscript import (
IntermediateDevice,
AnalogOut,
DigitalOut,
StaticAnalogOut,
StaticDigitalOut,
AnalogIn,
bitfield,
config,
compiler,
LabscriptError,
set_passed_properties,
)
from labscript_utils import dedent
from .utils import split_conn_DO, split_conn_AO, split_conn_AI
import numpy as np
import warnings
_ints = {8: np.uint8, 16: np.uint16, 32: np.uint32, 64: np.uint64}
[docs]
def _smallest_int_type(n):
"""Return the smallest unsigned integer type sufficient to contain n bits"""
return _ints[min(size for size in _ints.keys() if size >= n)]
[docs]
class NI_DAQmx(IntermediateDevice):
# Will be overridden during __init__ depending on configuration:
allowed_children = []
description = 'NI-DAQmx'
# If used as a wait monitor, this device's BLACS worker supports posting events in
# software indicating the completion of each wait as it occurs:
wait_monitor_supports_wait_completed_events = True
@set_passed_properties(
property_names={
"connection_table_properties": [
"clock_terminal",
"MAX_name",
"static_AO",
"static_DO",
"clock_mirror_terminal",
"connected_terminals",
"AI_range",
"AI_start_delay",
"AI_start_delay_ticks",
"AI_term",
"AI_chans",
"AI_timebase_terminal",
"AI_timebase_rate",
"AO_range",
"max_AI_multi_chan_rate",
"max_AI_single_chan_rate",
"max_AO_sample_rate",
"max_DO_sample_rate",
"min_semiperiod_measurement",
"num_AI",
"num_AO",
"num_CI",
"ports",
"supports_buffered_AO",
"supports_buffered_DO",
"supports_semiperiod_measurement",
"supports_simultaneous_AI_sampling",
"clock_limit",
"wait_monitor_minimum_pulse_width",
"wait_monitor_supports_wait_completed_events",
],
"device_properties": ["acquisition_rate","start_delay_ticks"],
}
)
def __init__(
self,
name,
parent_device=None,
clock_terminal=None,
MAX_name=None,
static_AO=None,
static_DO=None,
clock_mirror_terminal=None,
connected_terminals=None,
acquisition_rate=None,
AI_range=None,
AI_range_Diff=None,
AI_start_delay=0,
AI_start_delay_ticks=None,
AI_term='RSE',
AI_term_cfg=None,
AI_timebase_terminal=None,
AI_timebase_rate=None,
AO_range=None,
max_AI_multi_chan_rate=None,
max_AI_single_chan_rate=None,
max_AO_sample_rate=None,
max_DO_sample_rate=None,
min_semiperiod_measurement=None,
num_AI=0,
num_AO=0,
num_CI=0,
ports=None,
supports_buffered_AO=False,
supports_buffered_DO=False,
supports_semiperiod_measurement=False,
supports_simultaneous_AI_sampling=False,
**kwargs
):
"""Generic class for NI_DAQmx devices.
Generally over-ridden by device-specific subclasses that contain
the introspected default values.
Args:
name (str): name to assign to the created labscript device
parent_device (clockline): Parent clockline device that will
clock the outputs of this device
clock_terminal (str): What input on the DAQ is used for the clockline
MAX_name (str): NI-MAX device name
static_AO (int, optional): Number of static analog output channels.
static_DO (int, optional): Number of static digital output channels.
clock_mirror_terminal (str, optional): Channel string of digital output
that mirrors the input clock. Useful for daisy-chaning DAQs on the same
clockline.
connected_terminals (list, optional): List of pairs of strings of digital inputs
and digital outputs that will be connected. Useful for daisy-chaining DAQs
on the same clockline when they do not have direct routes (see Device Routes in NI MAX).
acquisiton_rate (float, optional): Default sample rate of inputs.
AI_range (iterable, optional): A `[Vmin, Vmax]` pair that sets the analog
input voltage range for all analog inputs.
AI_range_Diff (iterable, optional): A `[Vmin, Vmax]` pair that sets the analog
input voltage range for all analog inputs when using Differential termination.
AI_start_delay (float, optional): Time in seconds between start of an
analog input task starting and the first sample.
AI_start_delay_ticks (int, optional): Time in sample clock periods between
start of an analog input task starting and the first sample. To use
this method, `AI_start_delay` must be set to `None`. This is necessary
for DAQs that employ delta ADCs.
AI_term (str, optional): Configures the analog input termination for all
analog inputs. Must be supported by the device. Supported options are
`'RSE'`, `'NRSE'` `'Diff'`, and '`PseudoDiff'`.
AI_term_cfg (dict, optional): Dictionary of analog input channels and their
supported terminations. Best to use `get_capabilities.py` to introspect
these.
AI_timebase_terminal (str, optional): Channel string that specifies what
channel to use for the Sample Clock Timebase signal.
If None, use default internal clocks.
Must also specify the rate when not using the internal sources.
AI_timebase_rate (float, optional): Supplied clock frequency for the AI timebase.
Only specify if using an external clock source.
AO_range (iterable, optional): A `[Vmin, Vmax]` pair that sets the analog
output voltage range for all analog outputs.
max_AI_multi_chan_rate (float, optional): Max supported analog input
sampling rate when using multiple channels.
max_AI_single_chan_rate (float, optional): Max supported analog input
sampling rate when only using a single channel.
max_AO_sample_rate (float, optional): Max supported analog output
sample rate.
max_DO_sample_rate (float, optional): Max supported digital output
sample rate.
min_sermiperiod_measurement (float, optional): Minimum measurable time
for a semiperiod measurement.
num_AI (int, optional): Number of analog inputs channels.
num_AO (int, optional): Number of analog output channels.
num_CI (int, optional): Number of counter input channels.
ports (dict, optional): Dictionarly of DIO ports, which number of lines
and whether port supports buffered output.
supports_buffered_AO (bool, optional): True if analog outputs support
buffered output
supports_buffered_DO (bool, optional): True if digital outputs support
buffered output
supports_semiperiod_measurement (bool, optional): True if device supports
semi-period measurements
"""
# Default static output setting based on whether the device supports buffered
# output:
if static_AO is None:
static_AO = not supports_buffered_AO
if static_DO is None:
static_DO = not supports_buffered_DO
# Parent is only allowed to be None if output is static:
if parent_device is None and not (static_DO and static_AO):
msg = """Must specify a parent clockline, unless both static_AO and
static_DO are True"""
raise LabscriptError(dedent(msg))
# If parent device is not None though, then clock terminal must be specified:
if parent_device is not None and clock_terminal is None:
msg = """If parent_device is given, then clock_terminal must be specified as
well as the terminal to which the parent pseudoclock is connected."""
raise ValueError(dedent(msg))
if acquisition_rate is not None and num_AI == 0:
msg = "Cannot set set acquisition rate on device with no analog inputs"
raise ValueError(msg)
# Acquisition rate cannot be larger than the single channel rate:
if acquisition_rate is not None and acquisition_rate > max_AI_single_chan_rate:
msg = """acquisition_rate %f is larger than the maximum single-channel rate
%f for this device"""
raise ValueError(dedent(msg) % (acquisition_rate, max_AI_single_chan_rate))
self.clock_terminal = clock_terminal
self.MAX_name = MAX_name if MAX_name is not None else name
self.static_AO = static_AO
self.static_DO = static_DO
self.acquisition_rate = acquisition_rate
self.AO_range = AO_range
self.max_AI_multi_chan_rate = max_AI_multi_chan_rate
self.max_AI_single_chan_rate = max_AI_single_chan_rate
self.max_AO_sample_rate = max_AO_sample_rate
self.max_DO_sample_rate = max_DO_sample_rate
self.min_semiperiod_measurement = min_semiperiod_measurement
self.num_AI = num_AI
# special handling for AI termination configurations
self.AI_term = AI_term
if num_AI > 0:
if AI_term_cfg == None:
# assume legacy configuration if none provided
AI_term_cfg = {f'ai{i:d}': ['RSE'] for i in range(num_AI)}
# warn user to update their local model specs
msg = """Model specifications for {} needs to be updated.
Please run the `get_capabilites.py` and `generate_subclasses.py`
scripts or define the `AI_Term_Cfg` kwarg for your device.
"""
warnings.warn(dedent(msg.format(self.description)), FutureWarning)
self.AI_chans = [key for key,val in AI_term_cfg.items() if self.AI_term in val]
if not len(self.AI_chans):
msg = """AI termination {0} not supported for {1}."""
raise LabscriptError(dedent(msg.format(AI_term,self.description)))
if AI_term == 'Diff':
self.AI_range = AI_range_Diff
if AI_start_delay is None:
if AI_start_delay_ticks is not None:
# Tell blacs_worker to use AI_start_delay_ticks to define delay
self.start_delay_ticks = True
else:
raise LabscriptError("You have specified `AI_start_delay = None` but have not provided `AI_start_delay_ticks`.")
else:
# Tells blacs_worker to use AI_start_delay to define delay
self.start_delay_ticks = False
else:
# no analog inputs
self.AI_chans = []
self.start_delay_ticks = None
# special AI timebase handling
if num_AI > 0:
if (AI_timebase_rate is None) ^ (AI_timebase_terminal is None):
raise LabscriptError("You must specify terminal and rate when using an external AI timebase")
self.AI_timebase_terminal = AI_timebase_terminal
self.AI_timebease_rate = AI_timebase_rate
self.num_AO = num_AO
self.num_CI = num_CI
self.ports = ports if ports is not None else {}
self.supports_buffered_AO = supports_buffered_AO
self.supports_buffered_DO = supports_buffered_DO
self.supports_semiperiod_measurement = supports_semiperiod_measurement
self.supports_simultaneous_AI_sampling = supports_simultaneous_AI_sampling
if self.supports_buffered_DO and self.supports_buffered_AO:
self.clock_limit = min(self.max_DO_sample_rate, self.max_AO_sample_rate)
elif self.supports_buffered_DO:
self.clock_limit = self.max_DO_sample_rate
elif self.supports_buffered_AO:
self.clock_limit = self.max_AO_sample_rate
else:
self.clock_limit = None
if not (static_AO and static_DO):
msg = """Device does not support buffered output, please instantiate
it with static_AO=True and static_DO=True"""
raise LabscriptError(dedent(msg))
self.wait_monitor_minimum_pulse_width = self.min_semiperiod_measurement
self.allowed_children = []
'''Sets the allowed children types based on the capabilites.'''
if self.num_AI > 0:
self.allowed_children += [AnalogIn]
if self.num_AO > 0 and static_AO:
self.allowed_children += [StaticAnalogOut]
if self.num_AO > 0 and not static_AO:
self.allowed_children += [AnalogOut]
if self.ports and static_DO:
self.allowed_children += [StaticDigitalOut]
if self.ports and not static_DO:
self.allowed_children += [DigitalOut]
if clock_terminal is None and not (static_AO and static_DO):
msg = """Clock terminal must be specified unless static_AO and static_DO are
both True"""
raise LabscriptError(dedent(msg))
self.BLACS_connection = self.MAX_name
# Cannot be set with set_passed_properties because of name mangling with the
# initial double underscore:
self.set_property('__version__', __version__, 'connection_table_properties')
# This is called late since it must be called after our clock_limit attribute is
# set:
IntermediateDevice.__init__(self, name, parent_device, **kwargs)
[docs]
def add_device(self, device):
"""Error checking for adding a child device.
Args:
device (labscript device): Child labscript device to
attach to this device. Only types of devices in :obj:`allowed_children`
can be attached.
"""
# Verify static/dynamic outputs compatible with configuration:
if isinstance(device, StaticAnalogOut) and not self.static_AO:
msg = """Cannot add StaticAnalogOut to NI_DAQmx device configured for
dynamic analog output. Pass static_AO=True for static analog output"""
raise LabscriptError(dedent(msg))
if isinstance(device, StaticDigitalOut) and not self.static_DO:
msg = """Cannot add StaticDigitalOut to NI_DAQmx device configured for
dynamic digital output. Pass static_DO=True for static digital output"""
raise LabscriptError(dedent(msg))
if isinstance(device, AnalogOut) and self.static_AO:
msg = """Cannot add AnalogOut to NI_DAQmx device configured for
static analog output. Pass static_AO=False for dynamic analog output"""
raise LabscriptError(dedent(msg))
if isinstance(device, DigitalOut) and self.static_DO:
msg = """Cannot add DigitalOut to NI_DAQmx device configured for static
digital output. Pass static_DO=False for dynamic digital output"""
raise LabscriptError(dedent(msg))
# Verify connection string is OK:
if isinstance(device, (AnalogOut, StaticAnalogOut)):
ao_num = split_conn_AO(device.connection)
if ao_num >= self.num_AO:
msg = """Cannot add output with connection string '%s' to device with
num_AO=%d"""
raise ValueError(dedent(msg) % (device.connection, self.num_AO))
elif isinstance(device, (DigitalOut, StaticDigitalOut)):
port, line = split_conn_DO(device.connection)
port_str = 'port%d' % port
if port_str not in self.ports:
msg = "Parent device has no such DO port '%s'" % port_str
raise ValueError(msg)
nlines = self.ports[port_str]['num_lines']
if line >= nlines:
msg = """Canot add output with connection string '%s' to port '%s'
with only %d lines"""
raise ValueError(dedent(msg) % (device.connection, port_str, nlines))
supports_buffered = self.ports[port_str]['supports_buffered']
if isinstance(device, DigitalOut) and not supports_buffered:
msg = """Cannot add DigitalOut port '%s', which does not support
buffered output"""
raise ValueError(dedent(msg) % port_str)
elif isinstance(device, AnalogIn):
if device.connection not in self.AI_chans:
msg = """Cannot add analog input with connection string '%s' to device
with num_AI=%d"""
raise ValueError(dedent(msg) % (device.connection, self.num_AI))
if self.acquisition_rate is None:
msg = """Cannot add analog input to NI_DAQmx device with
acquisition_rate=None. Please set acquisition_rate as an
instantiation argument to the parent NI_DAQmx device."""
raise ValueError(dedent(msg))
if self.parent_device is None:
msg = """Cannot add analog input to device with no parent pseudoclock.
Even if there is no buffered output, a pseudoclock is still required
to trigger the start of acquisition. Please specify a parent_device
and clock_terminal for device %s"""
raise ValueError(dedent(msg) % self.name)
IntermediateDevice.add_device(self, device)
[docs]
def _check_even_children(self, analogs, digitals):
"""Check that there are an even number of children of each type."""
errmsg = """{0} {1} must have an even number of {2}s in order to guarantee an
even total number of samples, which is a limitation of the DAQmx library.
Please add a dummy {2} device or remove one you're not using, so that there
is an even number."""
if len(analogs) % 2:
msg = errmsg.format(self.description, self.name, 'analog output')
raise LabscriptError(dedent(msg))
if len(digitals) % 2:
msg = errmsg.format(self.description, self.name, 'digital output')
raise LabscriptError(dedent(msg))
[docs]
def _check_bounds(self, analogs):
"""Check that all analog outputs are in bounds"""
if not analogs:
return
vmin, vmax = self.AO_range
# Floating point rounding error can produce values that would mathematically be
# within bounds, but have ended up numerically out of bounds. We allow
# out-of-bounds values within a small threshold through, but apply clipping to
# keep them numerically within bounds. 1e-10 of the total range corresponds to >
# 32 bits of precision, so this is not changing the voltages at all since none
# of the DACs are that precise.
eps = abs(vmax - vmin) * 1e-10
for output in analogs.values():
if any((output.raw_output < vmin - eps) | (output.raw_output > vmax + eps)):
msg = """%s %s can only have values between %e and %e Volts, the limit
imposed by %s."""
msg = msg % (output.description, output.name, vmin, vmax, self.name)
raise LabscriptError(dedent(msg))
np.clip(output.raw_output, vmin, vmax, out=output.raw_output)
[docs]
def _check_AI_not_too_fast(self, AI_table):
"""Check that analog input acquisition rates do not exceed maximums."""
if AI_table is None:
return
n = len(set(AI_table['connection']))
if n < 2:
# Either no AI in use, or already checked against single channel rate in
# __init__.
return
if self.supports_simultaneous_AI_sampling and self.acquisition_rate <= self.max_AI_multi_chan_rate:
return
elif self.acquisition_rate <= self.max_AI_multi_chan_rate / n:
return
msg = """Requested acqusition_rate %f for device %s with %d analog input
channels in use is too fast. Device supports a rate of %f per channel when
multiple channels are in use."""
msg = msg % (self.acquisition_rate, self.name, n, self.max_AI_multi_chan_rate)
raise ValueError(dedent(msg))
[docs]
def _make_analog_out_table(self, analogs, times):
"""Collect analog output data and create the output array"""
if not analogs:
return None
n_timepoints = 1 if self.static_AO else len(times)
connections = sorted(analogs, key=split_conn_AO)
dtypes = [(c, np.float32) for c in connections]
analog_out_table = np.empty(n_timepoints, dtype=dtypes)
for connection, output in analogs.items():
analog_out_table[connection] = output.raw_output
return analog_out_table
[docs]
def _make_digital_out_table(self, digitals, times):
"""Collect digital output data and create the output array"""
if not digitals:
return None
n_timepoints = 1 if self.static_DO else len(times)
# List of output bits by port number:
bits_by_port = {}
# table names and dtypes by port number:
columns = {}
for connection, output in digitals.items():
port, line = split_conn_DO(connection)
port_str = 'port%d' % port
if port not in bits_by_port:
# Make a list of the right size for the number of lines
# on the port, or the number of bits in the smallest integer
# type that is equal to or larger than the number of lines.
nlines = self.ports[port_str]["num_lines"]
int_type = _smallest_int_type(nlines)
int_type_nbits = 8 * int_type().nbytes
columns[port] = (port_str, int_type)
bits_by_port[port] = [0] * int_type_nbits
bits_by_port[port][line] = output.raw_output
dtypes = [columns[port] for port in sorted(columns)]
digital_out_table = np.empty(n_timepoints, dtype=dtypes)
for port, bits in bits_by_port.items():
# Pack the bits from each port into an integer:
port_str, dtype = columns[port]
values = bitfield(bits, dtype=dtype)
# Put them into the table:
digital_out_table[port_str] = np.array(values)
return digital_out_table
[docs]
def _check_wait_monitor_timeout_device_config(self):
"""Check that if we are the wait monitor acquisition device and another device
is the wait monitor timeout device, that a) the other device is a DAQmx device
and b) the other device has a start_order lower than us and a stop_order higher
than us."""
if compiler.wait_monitor is None:
return
acquisition_device = compiler.wait_monitor.acquisition_device
timeout_device = compiler.wait_monitor.timeout_device
if acquisition_device is not self or timeout_device is None:
return
if timeout_device is self:
return
if not isinstance(timeout_device, NI_DAQmx):
msg = """If using an NI DAQmx device as a wait monitor acquisition device,
then the wait monitor timeout device must also be an NI DAQmx device,
not {}."""
raise TypeError(dedent(msg).format(type(timeout_device)))
timeout_start = timeout_device.start_order
if timeout_start is None:
timeout_start = 0
timeout_stop = timeout_device.stop_order
if timeout_stop is None:
timeout_stop = 0
self_start = self.start_order
if self_start is None:
self_start = 0
self_stop = self.stop_order
if self_stop is None:
self_stop = 0
if timeout_start >= self_start or timeout_stop <= self_stop:
msg = """If using different DAQmx devices as the wait monitor acquisition
and timeout devices, the timeout device must transition_to_buffered
before the acquisition device, and transition_to_manual after it, in
order to ensure the output port for timeout pulses is not in use (by the
manual mode DO task) when the wait monitor subprocess attempts to use
it. To achieve this, pass the start_order and stop_order keyword
arguments to the devices in your connection table, ensuring that the
timeout device has a lower start_order and a higher stop_order than the
acquisition device. The default start_order and stop_order is zero, so
if you are not otherwise controlling the order that devices are
programmed, you can set start_order=-1, stop_order=1 on the timeout
device only."""
raise RuntimeError(dedent(msg))
[docs]
def generate_code(self, hdf5_file):
"""Generates the hardware code from the script and saves it to the
shot h5 file.
This is called automatically when a shot is compiled.
Args:
hdf5_file (str): Path to shot's hdf5 file to save the instructions to.
"""
IntermediateDevice.generate_code(self, hdf5_file)
analogs = {}
digitals = {}
inputs = {}
for device in self.child_devices:
if isinstance(device, (AnalogOut, StaticAnalogOut)):
analogs[device.connection] = device
elif isinstance(device, (DigitalOut, StaticDigitalOut)):
digitals[device.connection] = device
elif isinstance(device, AnalogIn):
inputs[device.connection] = device
else:
raise TypeError(device)
clockline = self.parent_device
if clockline is None:
# No pseudoclock to call expand_timeseries() on our children.
# Call it ourselves:
for device in self.child_devices:
if isinstance(device, (StaticDigitalOut, StaticAnalogOut)):
device.expand_timeseries()
times = None
else:
times = clockline.parent_device.times[clockline]
self._check_even_children(analogs, digitals)
self._check_bounds(analogs)
AO_table = self._make_analog_out_table(analogs, times)
DO_table = self._make_digital_out_table(digitals, times)
AI_table = self._make_analog_input_table(inputs)
self._check_AI_not_too_fast(AI_table)
self._check_wait_monitor_timeout_device_config()
grp = self.init_device_group(hdf5_file)
if AO_table is not None:
grp.create_dataset('AO', data=AO_table, compression=config.compression)
if DO_table is not None:
grp.create_dataset('DO', data=DO_table, compression=config.compression)
if AI_table is not None:
grp.create_dataset('AI', data=AI_table, compression=config.compression)
from .models import *