#####################################################################
# #
# /NI_DAQmx/blacs_workers.py #
# #
# Copyright 2018, Monash University, JQI, Christopher Billington #
# #
# This file is part of the module labscript_devices, in the #
# labscript suite (see http://labscriptsuite.org), and is #
# licensed under the Simplified BSD License. See the license.txt #
# file in the root of the project for the full license. #
# #
#####################################################################
import sys
import time
import threading
from PyDAQmx import *
from PyDAQmx.DAQmxConstants import *
from PyDAQmx.DAQmxTypes import *
from PyDAQmx.DAQmxCallBack import *
import numpy as np
from numpy.lib.recfunctions import structured_to_unstructured
import labscript_utils.h5_lock
import h5py
from zprocess import Event
from zprocess.utils import _reraise
import labscript_utils.properties as properties
from labscript_utils import dedent
from labscript_utils.connections import _ensure_str
from blacs.tab_base_classes import Worker
from .utils import split_conn_port, split_conn_DO, split_conn_AI
from .daqmx_utils import incomplete_sample_detection
[docs]
class NI_DAQmxOutputWorker(Worker):
[docs]
def init(self):
self.check_version()
# Reset Device: clears previously added routes etc. Note: is insufficient for
# some devices, which require power cycling to truly reset.
DAQmxResetDevice(self.MAX_name)
self.start_manual_mode_tasks()
[docs]
def stop_tasks(self):
if self.AO_task is not None:
self.AO_task.StopTask()
self.AO_task.ClearTask()
self.AO_task = None
if self.DO_task is not None:
self.DO_task.StopTask()
self.DO_task.ClearTask()
self.DO_task = None
[docs]
def shutdown(self):
self.stop_tasks()
[docs]
def check_version(self):
"""Check the version of PyDAQmx is high enough to avoid a known bug"""
major = uInt32()
minor = uInt32()
patch = uInt32()
DAQmxGetSysNIDAQMajorVersion(major)
DAQmxGetSysNIDAQMinorVersion(minor)
DAQmxGetSysNIDAQUpdateVersion(patch)
if major.value == 14 and minor.value < 2:
msg = """There is a known bug with buffered shots using NI DAQmx v14.0.0.
This bug does not exist on v14.2.0. You are currently using v%d.%d.%d.
Please ensure you upgrade to v14.2.0 or higher."""
raise Exception(dedent(msg) % (major.value, minor.value, patch.value))
[docs]
def start_manual_mode_tasks(self):
# Create tasks:
if self.num_AO > 0:
self.AO_task = Task()
else:
self.AO_task = None
if self.ports:
self.DO_task = Task()
else:
self.DO_task = None
# Setup AO channels
for i in range(self.num_AO):
con = self.MAX_name + "/ao%d" % i
self.AO_task.CreateAOVoltageChan(
con, "", self.Vmin, self.Vmax, DAQmx_Val_Volts, None
)
# Setup DO channels
for port_str in sorted(self.ports, key=split_conn_port):
if not self.ports[port_str]['num_lines']:
continue
# Add each port to the task:
con = '%s/%s' % (self.MAX_name, port_str)
self.DO_task.CreateDOChan(con, "", DAQmx_Val_ChanForAllLines)
# Start tasks:
if self.AO_task is not None:
self.AO_task.StartTask()
if self.DO_task is not None:
self.DO_task.StartTask()
[docs]
def program_manual(self, front_panel_values):
written = int32()
if self.AO_task is not None:
AO_data = np.zeros(self.num_AO, dtype=np.float64)
for i in range(self.num_AO):
AO_data[i] = front_panel_values['ao%d' % i]
self.AO_task.WriteAnalogF64(
1, True, 1, DAQmx_Val_GroupByChannel, AO_data, written, None
)
if self.DO_task is not None:
# Due to two bugs in DAQmx, we will always pack our data into a uint32 and
# write using WriteDigitalU32. The first bug is some kind of use of
# uninitialised memory when using WriteDigitalLines, discussed here:
# https://bitbucket.org/labscript_suite
# /labscript_devices/pull-requests/56/#comment-83671312
# The second is that using a smaller int dtype sometimes fails even though
# it is the correct int size for the size of the port. Using a 32 bit int
# always works, the additional bits are ignored. This is discussed here:
# https://forums.ni.com/t5/Multifunction-DAQ
# /problem-with-correlated-DIO-on-USB-6341/td-p/3344066
DO_data = np.zeros(len(self.ports), dtype=np.uint32)
for conn, value in front_panel_values.items():
if conn.startswith('port'):
port, line = split_conn_DO(conn)
DO_data[port] |= value << line
self.DO_task.WriteDigitalU32(
1, True, 10.0, DAQmx_Val_GroupByChannel, DO_data, written, None
)
# TODO: return coerced/quantised values
return {}
[docs]
def get_output_tables(self, h5file, device_name):
"""Return the AO and DO tables rom the file, or None if they do not exist."""
with h5py.File(h5file, 'r') as hdf5_file:
group = hdf5_file['devices'][device_name]
try:
AO_table = group['AO'][:]
except KeyError:
AO_table = None
try:
DO_table = group['DO'][:]
except KeyError:
DO_table = None
return AO_table, DO_table
[docs]
def set_mirror_clock_terminal_connected(self, connected):
"""Mirror the clock terminal on another terminal to allow daisy chaining of the
clock line to other devices, if applicable"""
if self.clock_mirror_terminal is None:
return
if connected:
DAQmxConnectTerms(
self.clock_terminal,
self.clock_mirror_terminal,
DAQmx_Val_DoNotInvertPolarity,
)
else:
DAQmxDisconnectTerms(self.clock_terminal, self.clock_mirror_terminal)
[docs]
def set_connected_terminals_connected(self, connected):
"""Connect the terminals in the connected terminals list.
Allows on daisy chaining of the clock line to/from other devices
that do not have a direct route (see Device Routes in NI MAX)."""
if self.connected_terminals is None:
return
if connected:
for terminal_pair in self.connected_terminals:
DAQmxConnectTerms(
terminal_pair[0],
terminal_pair[1],
DAQmx_Val_DoNotInvertPolarity,
)
else:
for terminal_pair in self.connected_terminals:
DAQmxDisconnectTerms(terminal_pair[0], terminal_pair[1])
[docs]
def program_buffered_DO(self, DO_table):
"""Create the DO task and program in the DO table for a shot. Return a
dictionary of the final values of each channel in use"""
if DO_table is None:
return {}
self.DO_task = Task()
written = int32()
ports = DO_table.dtype.names
final_values = {}
for port_str in ports:
# Add each port to the task:
con = '%s/%s' % (self.MAX_name, port_str)
self.DO_task.CreateDOChan(con, "", DAQmx_Val_ChanForAllLines)
# Collect the final values of the lines on this port:
port_final_value = DO_table[port_str][-1]
for line in range(self.ports[port_str]["num_lines"]):
# Extract each digital value from the packed bits:
line_final_value = bool((1 << line) & port_final_value)
final_values['%s/line%d' % (port_str, line)] = int(line_final_value)
# Convert DO table to a regular array and ensure it is C continguous:
DO_table = np.ascontiguousarray(
structured_to_unstructured(DO_table, dtype=np.uint32)
)
# Check if DOs are all zero for the whole shot. If they are this triggers a
# bug in NI-DAQmx that throws a cryptic error for buffered output. In this
# case, run it as a non-buffered task.
self.DO_all_zero = not np.any(DO_table)
if self.DO_all_zero:
DO_table = DO_table[0:1]
if self.static_DO or self.DO_all_zero:
# Static DO. Start the task and write data, no timing configuration.
self.DO_task.StartTask()
# Write data. See the comment in self.program_manual as to why we are using
# uint32 instead of the native size of each port
self.DO_task.WriteDigitalU32(
1, # npts
False, # autostart
10.0, # timeout
DAQmx_Val_GroupByScanNumber,
DO_table,
written,
None,
)
else:
# We use all but the last sample (which is identical to the second last
# sample) in order to ensure there is one more clock tick than there are
# samples. This is required by some devices to determine that the task has
# completed.
npts = len(DO_table) - 1
# Set up timing:
self.DO_task.CfgSampClkTiming(
self.clock_terminal,
self.clock_limit,
DAQmx_Val_Rising,
DAQmx_Val_FiniteSamps,
npts,
)
# Write data. See the comment in self.program_manual as to why we are using
# uint32 instead of the native size of each port.
self.DO_task.WriteDigitalU32(
npts,
False, # autostart
10.0, # timeout
DAQmx_Val_GroupByScanNumber,
DO_table[:-1], # All but the last sample as mentioned above
written,
None,
)
# Go!
self.DO_task.StartTask()
return final_values
[docs]
def program_buffered_AO(self, AO_table):
if AO_table is None:
return {}
self.AO_task = Task()
written = int32()
channels = ', '.join(self.MAX_name + '/' + c for c in AO_table.dtype.names)
self.AO_task.CreateAOVoltageChan(
channels, "", self.Vmin, self.Vmax, DAQmx_Val_Volts, None
)
# Collect the final values of the analog outs:
final_values = dict(zip(AO_table.dtype.names, AO_table[-1]))
# Convert AO table to a regular array and ensure it is C continguous:
AO_table = np.ascontiguousarray(
structured_to_unstructured(AO_table, dtype=np.float64)
)
# Check if AOs are all zero for the whole shot. If they are this triggers a
# bug in NI-DAQmx that throws a cryptic error for buffered output. In this
# case, run it as a non-buffered task.
self.AO_all_zero = not np.any(AO_table)
if self.AO_all_zero:
AO_table = AO_table[0:1]
if self.static_AO or self.AO_all_zero:
# Static AO. Start the task and write data, no timing configuration.
self.AO_task.StartTask()
self.AO_task.WriteAnalogF64(
1, True, 10.0, DAQmx_Val_GroupByChannel, AO_table, written, None
)
else:
# We use all but the last sample (which is identical to the second last
# sample) in order to ensure there is one more clock tick than there are
# samples. This is required by some devices to determine that the task has
# completed.
npts = len(AO_table) - 1
# Set up timing:
self.AO_task.CfgSampClkTiming(
self.clock_terminal,
self.clock_limit,
DAQmx_Val_Rising,
DAQmx_Val_FiniteSamps,
npts,
)
# Write data:
self.AO_task.WriteAnalogF64(
npts,
False, # autostart
10.0, # timeout
DAQmx_Val_GroupByScanNumber,
AO_table[:-1], # All but the last sample as mentioned above
written,
None,
)
# Go!
self.AO_task.StartTask()
return final_values
[docs]
def transition_to_buffered(self, device_name, h5file, initial_values, fresh):
# Store the initial values in case we have to abort and restore them:
self.initial_values = initial_values
# Stop the manual mode output tasks, if any:
self.stop_tasks()
# Get the data to be programmed into the output tasks:
AO_table, DO_table = self.get_output_tables(h5file, device_name)
# Mirror the clock terminal, if applicable:
self.set_mirror_clock_terminal_connected(True)
# Mirror other terminals, if applicable
self.set_connected_terminals_connected(True)
# Program the output tasks and retrieve the final values of each output:
DO_final_values = self.program_buffered_DO(DO_table)
AO_final_values = self.program_buffered_AO(AO_table)
final_values = {}
final_values.update(DO_final_values)
final_values.update(AO_final_values)
# If we are the wait timeout device, then the final value of the timeout line
# should be its rearm value:
if self.wait_timeout_device == self.device_name:
final_values[self.wait_timeout_connection] = self.wait_timeout_rearm_value
return final_values
[docs]
def transition_to_manual(self, abort=False):
# Stop output tasks and call program_manual. Only call StopTask if not aborting.
# Otherwise results in an error if output was incomplete. If aborting, call
# ClearTask only.
npts = uInt64()
samples = uInt64()
tasks = []
if self.AO_task is not None:
tasks.append([self.AO_task, self.static_AO or self.AO_all_zero, 'AO'])
self.AO_task = None
if self.DO_task is not None:
tasks.append([self.DO_task, self.static_DO or self.DO_all_zero, 'DO'])
self.DO_task = None
for task, static, name in tasks:
if not abort:
if not static:
try:
# Wait for task completion with a 1 second timeout:
task.WaitUntilTaskDone(1)
finally:
# Log where we were up to in sample generation, regardless of
# whether the above succeeded:
task.GetWriteCurrWritePos(npts)
task.GetWriteTotalSampPerChanGenerated(samples)
# Detect -1 even though they're supposed to be unsigned ints, -1
# seems to indicate the task was not started:
current = samples.value if samples.value != 2 ** 64 - 1 else -1
total = npts.value if npts.value != 2 ** 64 - 1 else -1
msg = 'Stopping %s at sample %d of %d'
self.logger.info(msg, name, current, total)
task.StopTask()
task.ClearTask()
# Remove the mirroring of the clock terminal, if applicable:
self.set_mirror_clock_terminal_connected(False)
# Remove connections between other terminals, if applicable:
self.set_connected_terminals_connected(False)
# Set up manual mode tasks again:
self.start_manual_mode_tasks()
if abort:
# Reprogram the initial states:
self.program_manual(self.initial_values)
return True
[docs]
def abort_transition_to_buffered(self):
return self.transition_to_manual(True)
[docs]
def abort_buffered(self):
return self.transition_to_manual(True)
[docs]
class NI_DAQmxAcquisitionWorker(Worker):
MAX_READ_INTERVAL = 0.2
MAX_READ_PTS = 10000
[docs]
def init(self):
# Prevent interference between the read callback and the shutdown code:
self.tasklock = threading.RLock()
# Assigned on a per-task basis and cleared afterward:
self.read_array = None
self.task = None
# Assigned on a per-shot basis and cleared afterward:
self.buffered_mode = False
self.h5_file = None
self.acquired_data = None
self.buffered_rate = None
self.buffered_chans = None
# Hard coded for now. Perhaps we will add functionality to enable
# and disable inputs in manual mode, and adjust the rate:
self.manual_mode_chans = self.AI_chans
self.manual_mode_rate = 1000
# An event for knowing when the wait durations are known, so that we may use
# them to chunk up acquisition data:
self.wait_durations_analysed = Event('wait_durations_analysed')
# Start task for manual mode
self.start_task(self.manual_mode_chans, self.manual_mode_rate)
[docs]
def shutdown(self):
if self.task is not None:
self.stop_task()
[docs]
def read(self, task_handle, event_type, num_samples, callback_data=None):
"""Called as a callback by DAQmx while task is running. Also called by us to get
remaining data just prior to stopping the task. Since the callback runs
in a separate thread, we need to serialise access to instance variables"""
samples_read = int32()
with self.tasklock:
if self.task is None or task_handle != self.task.taskHandle.value:
# Task stopped already.
return 0
self.task.ReadAnalogF64(
num_samples,
-1,
DAQmx_Val_GroupByScanNumber,
self.read_array,
self.read_array.size,
samples_read,
None,
)
# Select only the data read, and downconvert to 32 bit:
data = self.read_array[: int(samples_read.value), :].astype(np.float32)
if self.buffered_mode:
# Append to the list of acquired data:
self.acquired_data.append(data)
else:
# TODO: Send it to the broker thingy.
pass
return 0
[docs]
def start_task(self, chans, rate):
"""Set up a task that acquires data with a callback every MAX_READ_PTS points or
MAX_READ_INTERVAL seconds, whichever is faster. NI DAQmx calls callbacks in a
separate thread, so this method returns, but data acquisition continues until
stop_task() is called. Data is appended to self.acquired_data if
self.buffered_mode=True, or (TODO) sent to the [whatever the AI server broker is
called] if self.buffered_mode=False."""
if self.task is not None:
raise RuntimeError('Task already running')
if chans is None:
return
# Get data MAX_READ_PTS points at a time or once every MAX_READ_INTERVAL
# seconds, whichever is faster:
num_samples = min(self.MAX_READ_PTS, int(rate * self.MAX_READ_INTERVAL))
self.read_array = np.zeros((num_samples, len(chans)), dtype=np.float64)
self.task = Task()
if self.AI_term == 'RSE':
term = DAQmx_Val_RSE
elif self.AI_term == 'NRSE':
term = DAQmx_Val_NRSE
elif self.AI_term == 'Diff':
term = DAQmx_Val_Diff
elif self.AI_term == 'PseudoDiff':
term = DAQmx_Val_PseudoDiff
for chan in chans:
self.task.CreateAIVoltageChan(
self.MAX_name + '/' + chan,
"",
term,
self.AI_range[0],
self.AI_range[1],
DAQmx_Val_Volts,
None,
)
if self.AI_timebase_terminal is None:
# use internal default
pass
else:
self.task.SetSampClkTimebaseSrc(self.AI_timebase_terminal)
self.task.SetSampClkTimebaseRate(self.AI_timebase_rate)
self.task.CfgSampClkTiming(
"", rate, DAQmx_Val_Rising, DAQmx_Val_ContSamps, num_samples
)
if self.buffered_mode:
self.task.CfgDigEdgeStartTrig(self.clock_terminal, DAQmx_Val_Rising)
# This must not be garbage collected until the task is:
self.task.callback_ptr = DAQmxEveryNSamplesEventCallbackPtr(self.read)
self.task.RegisterEveryNSamplesEvent(
DAQmx_Val_Acquired_Into_Buffer, num_samples, 0, self.task.callback_ptr, 100
)
self.task.StartTask()
[docs]
def stop_task(self):
with self.tasklock:
if self.task is None:
raise RuntimeError('Task not running')
# Read remaining data:
self.read(self.task, None, -1)
# Stop the task:
self.task.StopTask()
self.task.ClearTask()
self.task = None
self.read_array = None
[docs]
def transition_to_buffered(self, device_name, h5file, initial_values, fresh):
self.logger.debug('transition_to_buffered')
# read channels, acquisition rate, etc from H5 file
with h5py.File(h5file, 'r') as f:
group = f['/devices/' + device_name]
if 'AI' not in group:
# No acquisition
return {}
AI_table = group['AI'][:]
device_properties = properties.get(f, device_name, 'device_properties')
chans = [_ensure_str(c) for c in AI_table['connection']]
# Remove duplicates and sort:
if chans:
self.buffered_chans = sorted(set(chans), key=split_conn_AI)
self.h5_file = h5file
self.buffered_rate = device_properties['acquisition_rate']
if device_properties['start_delay_ticks']:
# delay is defined in sample clock ticks, calculate in sec and save for later
self.AI_start_delay = self.AI_start_delay_ticks*self.buffered_rate
self.acquired_data = []
# Stop the manual mode task and start the buffered mode task:
self.stop_task()
self.buffered_mode = True
self.start_task(self.buffered_chans, self.buffered_rate)
return {}
[docs]
def transition_to_manual(self, abort=False):
self.logger.debug('transition_to_manual')
# If we were doing buffered mode acquisition, stop the buffered mode task and
# start the manual mode task. We might not have been doing buffered mode
# acquisition if abort() was called when we are not in buffered mode, or if
# there were no acuisitions this shot.
if not self.buffered_mode:
return True
if self.buffered_chans is not None:
self.stop_task()
self.buffered_mode = False
self.logger.info('transitioning to manual mode, task stopped')
self.start_task(self.manual_mode_chans, self.manual_mode_rate)
if abort:
self.acquired_data = None
self.buffered_chans = None
self.h5_file = None
self.buffered_rate = None
return True
with h5py.File(self.h5_file, 'a') as hdf5_file:
data_group = hdf5_file['data']
data_group.create_group(self.device_name)
waits_in_use = len(hdf5_file['waits']) > 0
if self.buffered_chans is not None and not self.acquired_data:
msg = """No data was acquired. Perhaps the acquisition task was not
triggered to start, is the device connected to a pseudoclock?"""
raise RuntimeError(dedent(msg))
# Concatenate our chunks of acquired data and recast them as a structured
# array with channel names:
if self.acquired_data:
start_time = time.time()
dtypes = [(chan, np.float32) for chan in self.buffered_chans]
raw_data = np.concatenate(self.acquired_data).view(dtypes)
raw_data = raw_data.reshape((len(raw_data),))
self.acquired_data = None
self.buffered_chans = None
self.extract_measurements(raw_data, waits_in_use)
self.h5_file = None
self.buffered_rate = None
msg = 'data written, time taken: %ss' % str(time.time() - start_time)
else:
msg = 'No acquisitions in this shot.'
self.logger.info(msg)
return True
[docs]
def abort_buffered(self):
return self.transition_to_manual(True)
[docs]
def abort_transition_to_buffered(self):
return self.transition_to_manual(True)
[docs]
def program_manual(self, values):
return {}
[docs]
class NI_DAQmxWaitMonitorWorker(Worker):
[docs]
def init(self):
self.all_waits_finished = Event('all_waits_finished', type='post')
self.wait_durations_analysed = Event('wait_durations_analysed', type='post')
self.wait_completed = Event('wait_completed', type='post')
# Set on a per-shot basis and cleared afterward:
self.h5_file = None
self.CI_task = None
self.DO_task = None
self.wait_table = None
self.semiperiods = None
self.wait_monitor_thread = None
# Saved error in case one occurs in the thread, we can raise it later in
# transition_to_manual:
self.wait_monitor_thread_exception = None
# To trigger early shutdown of the wait monitor thread:
self.shutting_down = False
# Does this device have the "incomplete sample detection" feature? This
# determines whether the first sample on our semiperiod counter input task will
# be automatically discarded before we see it, or whether we will have to
# discard it ourselves
self.incomplete_sample_detection = incomplete_sample_detection(self.MAX_name)
# Data for timeout triggers:
if self.timeout_trigger_type == 'rising':
trigger_value = 1
rearm_value = 0
elif self.timeout_trigger_type == 'falling':
trigger_value = 0
rearm_value = 1
else:
msg = 'timeout_trigger_type must be "rising" or "falling", not "{}".'
raise ValueError(msg.format(self.timeout_trigger_type))
self.timeout_trigger = np.array([trigger_value], dtype=np.uint8)
self.timeout_rearm = np.array([rearm_value], dtype=np.uint8)
[docs]
def shutdown(self):
self.stop_tasks(True)
[docs]
def read_edges(self, npts, timeout=None):
"""Wait up to the given timeout in seconds for an edge on the wait monitor and
and return the duration since the previous edge. Return None upon timeout."""
samples_read = int32()
# If no timeout, call read repeatedly with a 0.2 second timeout to ensure we
# don't block indefinitely and can still abort.
if timeout is None:
read_timeout = 0.2
else:
read_timeout = timeout
read_array = np.zeros(npts)
while True:
if self.shutting_down:
raise RuntimeError('Stopped before expected number of samples acquired')
try:
self.CI_task.ReadCounterF64(
npts, read_timeout, read_array, npts, samples_read, None
)
except SamplesNotYetAvailableError:
if timeout is None:
continue
return None
return read_array
[docs]
def wait_monitor(self):
try:
# Read edge times from the counter input task, indiciating the times of the
# pulses that occur at the start of the experiment and after every wait. If a
# timeout occurs, pulse the timeout output to force a resume of the master
# pseudoclock. Save the resulting
self.logger.debug('Wait monitor thread starting')
with self.kill_lock:
self.logger.debug('Waiting for start of experiment')
# Wait for the pulse indicating the start of the experiment:
if self.incomplete_sample_detection:
semiperiods = self.read_edges(1, timeout=None)
else:
semiperiods = self.read_edges(2, timeout=None)
self.logger.debug('Experiment started, got edges:' + str(semiperiods))
# May have been one or two edges, depending on whether the device has
# incomplete sample detection. We are only interested in the second one
# anyway, it tells us how long the initial pulse was. Store the pulse width
# for later, we will use it for making timeout pulses if necessary. Note
# that the variable current_time is labscript time, so it will be reset
# after each wait to the time of that wait plus pulse_width.
current_time = pulse_width = semiperiods[-1]
self.semiperiods.append(semiperiods[-1])
# Alright, we're now a short way into the experiment.
for wait in self.wait_table:
# How long until when the next wait should timeout?
timeout = wait['time'] + wait['timeout'] - current_time
timeout = max(timeout, 0) # ensure non-negative
# Wait that long for the next pulse:
self.logger.debug('Waiting for pulse indicating end of wait')
semiperiods = self.read_edges(2, timeout)
# Did the wait finish of its own accord, or time out?
if semiperiods is None:
# It timed out. If there is a timeout device, send a trigger to
# resume the clock!
if self.DO_task is not None:
msg = """Wait timed out; retriggering clock with {:.3e} s
pulse ({} edge)"""
msg = msg.format(pulse_width, self.timeout_trigger_type)
self.logger.debug(dedent(msg))
self.send_resume_trigger(pulse_width)
else:
msg = """Specified wait timeout exceeded, but there is no
timeout device with which to resume the experiment.
Continuing to wait."""
self.logger.warning(dedent(msg))
# Keep waiting for the clock to resume:
self.logger.debug('Waiting for pulse indicating end of wait')
semiperiods = self.read_edges(2, timeout=None)
# Alright, now we're at the end of the wait.
self.semiperiods.extend(semiperiods)
self.logger.debug('Wait completed')
current_time = wait['time'] + semiperiods[-1]
# Inform any interested parties that a wait has completed:
postdata = _ensure_str(wait['label'])
self.wait_completed.post(self.h5_file, data=postdata)
# Inform any interested parties that waits have all finished:
self.logger.debug('All waits finished')
self.all_waits_finished.post(self.h5_file)
except Exception:
self.logger.exception('Exception in wait monitor thread:')
# Save the exception so it can be raised in transition_to_manual
self.wait_monitor_thread_exception = sys.exc_info()
[docs]
def send_resume_trigger(self, pulse_width):
written = int32()
# Trigger:
self.DO_task.WriteDigitalLines(
1, True, 1, DAQmx_Val_GroupByChannel, self.timeout_trigger, written, None
)
# Wait however long we observed the first pulse of the experiment to be. In
# practice this is likely to be negligible compared to the other software delays
# here, but in case it is larger we'd better wait:
time.sleep(pulse_width)
# Rearm trigger:
self.DO_task.WriteDigitalLines(
1, True, 1, DAQmx_Val_GroupByChannel, self.timeout_rearm, written, None
)
[docs]
def stop_tasks(self, abort):
self.logger.debug('stop_tasks')
if self.wait_monitor_thread is not None:
if abort:
# This will cause the wait_monitor thread to raise an exception within a
# short time, allowing us to join it before it would otherwise be done.
self.shutting_down = True
self.wait_monitor_thread.join()
self.wait_monitor_thread = None
self.shutting_down = False
if not abort and self.wait_monitor_thread_exception is not None:
# Raise any unexpected errors from the wait monitor thread:
_reraise(*self.wait_monitor_thread_exception)
self.wait_monitor_thread_exception = None
if not abort:
# Don't want errors about incomplete task to be raised if we are aborting:
self.CI_task.StopTask()
if self.DO_task is not None:
self.DO_task.StopTask()
if self.CI_task is not None:
self.CI_task.ClearTask()
self.CI_task = None
if self.DO_task is not None:
self.DO_task.ClearTask()
self.DO_task = None
self.logger.debug('finished stop_tasks')
[docs]
def start_tasks(self):
# The counter acquisition task:
self.CI_task = Task()
CI_chan = self.MAX_name + '/' + self.wait_acq_connection
# What is the longest time in between waits, plus the timeout of the
# second wait?
interwait_times = np.diff([0] + list(self.wait_table['time']))
max_measure_time = max(interwait_times + self.wait_table['timeout'])
# Allow for software delays in timeouts.
max_measure_time += 1.0
min_measure_time = self.min_semiperiod_measurement
self.logger.debug(
"CI measurement range is: min: %f max: %f",
min_measure_time,
max_measure_time,
)
self.CI_task.CreateCISemiPeriodChan(
CI_chan, '', min_measure_time, max_measure_time, DAQmx_Val_Seconds, ""
)
num_edges = 2 * (len(self.wait_table) + 1)
self.CI_task.CfgImplicitTiming(DAQmx_Val_ContSamps, num_edges)
self.CI_task.StartTask()
# The timeout task:
if self.wait_timeout_MAX_name is not None:
self.DO_task = Task()
DO_chan = self.wait_timeout_MAX_name + '/' + self.wait_timeout_connection
self.DO_task.CreateDOChan(DO_chan, "", DAQmx_Val_ChanForAllLines)
# Ensure timeout trigger is armed:
written = int32()
# Writing autostarts the task:
self.DO_task.WriteDigitalLines(
1, True, 1, DAQmx_Val_GroupByChannel, self.timeout_rearm, written, None
)
[docs]
def transition_to_buffered(self, device_name, h5file, initial_values, fresh):
self.logger.debug('transition_to_buffered')
self.h5_file = h5file
with h5py.File(h5file, 'r') as hdf5_file:
dataset = hdf5_file['waits']
if len(dataset) == 0:
# There are no waits. Do nothing.
self.logger.debug('There are no waits, not transitioning to buffered')
self.wait_table = None
return {}
self.wait_table = dataset[:]
self.start_tasks()
# An array to store the results of counter acquisition:
self.semiperiods = []
self.wait_monitor_thread = threading.Thread(target=self.wait_monitor)
# Not a daemon thread, as it implements wait timeouts - we need it to stay alive
# if other things die.
self.wait_monitor_thread.start()
self.logger.debug('finished transition to buffered')
return {}
[docs]
def transition_to_manual(self, abort=False):
self.logger.debug('transition_to_manual')
self.stop_tasks(abort)
if not abort and self.wait_table is not None:
# Let's work out how long the waits were. The absolute times of each edge on
# the wait monitor were:
edge_times = np.cumsum(self.semiperiods)
# Now there was also a rising edge at t=0 that we didn't measure:
edge_times = np.insert(edge_times, 0, 0)
# Ok, and the even-indexed ones of these were rising edges.
rising_edge_times = edge_times[::2]
# Now what were the times between rising edges?
periods = np.diff(rising_edge_times)
# How does this compare to how long we expected there to be between the
# start of the experiment and the first wait, and then between each pair of
# waits? The difference will give us the waits' durations.
resume_times = self.wait_table['time']
# Again, include the start of the experiment, t=0:
resume_times = np.insert(resume_times, 0, 0)
run_periods = np.diff(resume_times)
wait_durations = periods - run_periods
waits_timed_out = wait_durations > self.wait_table['timeout']
# Work out how long the waits were, save them, post an event saying so:
dtypes = [
('label', 'a256'),
('time', float),
('timeout', float),
('duration', float),
('timed_out', bool),
]
data = np.empty(len(self.wait_table), dtype=dtypes)
data['label'] = self.wait_table['label']
data['time'] = self.wait_table['time']
data['timeout'] = self.wait_table['timeout']
data['duration'] = wait_durations
data['timed_out'] = waits_timed_out
with h5py.File(self.h5_file, 'a') as hdf5_file:
hdf5_file.create_dataset('/data/waits', data=data)
self.wait_durations_analysed.post(self.h5_file)
self.h5_file = None
self.semiperiods = None
return True
[docs]
def abort_buffered(self):
return self.transition_to_manual(True)
[docs]
def abort_transition_to_buffered(self):
return self.transition_to_manual(True)
[docs]
def program_manual(self, values):
return {}