Source code for labscript_devices.NI_DAQmx.models.get_capabilities

#####################################################################
#                                                                   #
# /NI_DAQmx/models/get_capabilities.py                              #
#                                                                   #
# Copyright 2018, Christopher Billington                            #
#                                                                   #
# This file is part of the module labscript_devices, in the         #
# labscript suite (see http://labscriptsuite.org), and is           #
# licensed under the Simplified BSD License. See the license.txt    #
# file in the root of the project for the full license.             #
#                                                                   #
#####################################################################
"""This is a script to update `model_capabilities.json` with the capabilities of all
NI-DAQmx devices currently connected to this computer. 

Run this script to add support for a new model of NI-DAQmx device. 
Note that this will work with a simulated device configured through NI-MAX as well, 
so support can be added without actually having the physical device.

Called from the command line via

.. code-block:: shell

    python get_capabilities.py

"""

import numpy as np
import os
import ctypes
import json
import PyDAQmx
from PyDAQmx import byref, Task
from PyDAQmx.DAQmxTypes import uInt32, bool32, int32, float64
import PyDAQmx.DAQmxConstants as c

THIS_FOLDER = os.path.dirname(os.path.abspath(__file__))
CAPABILITIES_FILE = os.path.join(THIS_FOLDER, 'capabilities.json')


[docs] def string_prop(func): """String property wrapper. Args: func (function): PyDAQmx library function that returns a string. Returns: function: The wrapped function. """ def wrapped(name=None): BUFSIZE = 4096 result = ctypes.create_string_buffer(BUFSIZE) if name is None: func(result, uInt32(BUFSIZE)) else: func(name, result, uInt32(BUFSIZE)) return result.value.decode('utf8') return wrapped
[docs] def bool_prop(func): """Bool property wrapper. Args: func (function): PyDAQmx library function that returns a boolean. Returns: function: The wrapped function. """ def wrapped(name): result = bool32() func(name, byref(result)) return bool(result.value) return wrapped
[docs] def int32_prop(func): """Int32 property wrapper. Args: func (function): PyDAQmx library function that returns a int32. Returns: function: The wrapped function. """ def wrapped(name): result = int32() func(name, byref(result)) return result.value return wrapped
[docs] def float64_prop(func): """Float property wrapper. Args: func (function): PyDAQmx library function that returns a float64. Returns: function: The wrapped function. """ def wrapped(name): result = float64() func(name, byref(result)) return result.value return wrapped
[docs] def float64_array_prop(func): """Array of floats property wrapper. Args: func (function): PyDAQmx library function that returns an array of float64s. Returns: function: The wrapped function. """ def wrapped(name): import warnings with warnings.catch_warnings(): # PyDAQmx warns about a positive return value, but actually this is how you # are supposed to figure out the size of the array required. warnings.simplefilter("ignore") # Pass in null pointer and 0 len to ask for what array size is needed: npts = func(name, byref(float64()), 0) # Create that array result = (float64 * npts)() func(name, result, npts) result = [result[i] for i in range(npts)] return result return wrapped
[docs] def chans(func): """string_prop but splitting the return value into separate channels and stripping the device name from them Args: func (function): PyDAQmx library function that returns channel string. Returns: function: The wrapped function. """ wrapped1 = string_prop(func) def wrapped2(name): result = wrapped1(name) if result: return [s.strip('/').split('/', 1)[1] for s in result.split(', ')] return [] return wrapped2
DAQmxGetSysDevNames = string_prop(PyDAQmx.DAQmxGetSysDevNames) DAQmxGetDevProductType = string_prop(PyDAQmx.DAQmxGetDevProductType) DAQmxGetDevAnlgTrigSupported = bool_prop(PyDAQmx.DAQmxGetDevAnlgTrigSupported) DAQmxGetDevDigTrigSupported = bool_prop(PyDAQmx.DAQmxGetDevDigTrigSupported) DAQmxGetDevAOSampClkSupported = bool_prop(PyDAQmx.DAQmxGetDevAOSampClkSupported) DAQmxGetDevAOPhysicalChans = chans(PyDAQmx.DAQmxGetDevAOPhysicalChans) DAQmxGetDevAIPhysicalChans = chans(PyDAQmx.DAQmxGetDevAIPhysicalChans) DAQmxGetDevDOPorts = chans(PyDAQmx.DAQmxGetDevDOPorts) DAQmxGetDevDOLines = chans(PyDAQmx.DAQmxGetDevDOLines) DAQmxGetDevDIPorts = chans(PyDAQmx.DAQmxGetDevDIPorts) DAQmxGetDevDILines = chans(PyDAQmx.DAQmxGetDevDILines) DAQmxGetDevTerminals = chans(PyDAQmx.DAQmxGetDevTerminals) DAQmxGetDevCIPhysicalChans = chans(PyDAQmx.DAQmxGetDevCIPhysicalChans) DAQmxGetDevDOMaxRate = float64_prop(PyDAQmx.DAQmxGetDevDOMaxRate) DAQmxGetDevAOMaxRate = float64_prop(PyDAQmx.DAQmxGetDevAOMaxRate) DAQmxGetDevAIMaxSingleChanRate = float64_prop(PyDAQmx.DAQmxGetDevAIMaxSingleChanRate) DAQmxGetDevAIMaxMultiChanRate = float64_prop(PyDAQmx.DAQmxGetDevAIMaxMultiChanRate) DAQmxGetDevAOVoltageRngs = float64_array_prop(PyDAQmx.DAQmxGetDevAOVoltageRngs) DAQmxGetDevAIVoltageRngs = float64_array_prop(PyDAQmx.DAQmxGetDevAIVoltageRngs) DAQmxGetPhysicalChanAITermCfgs = int32_prop(PyDAQmx.DAQmxGetPhysicalChanAITermCfgs) DAQmxGetDevAISimultaneousSamplingSupported = bool_prop(PyDAQmx.DAQmxGetDevAISimultaneousSamplingSupported)
[docs] def port_supports_buffered(device_name, port, clock_terminal=None): """Empirically determines if the digital port supports buffered output. Args: device_name (str): NI-MAX device name port (int): Which port to intro-spect clock_terminal (str, optional): String that specifies the clock terminal. Returns: bool: True if `port` supports buffered output. """ all_terminals = DAQmxGetDevTerminals(device_name) if clock_terminal is None: clock_terminal = all_terminals[0] npts = 16 task = Task() clock_terminal_full = '/' + device_name + '/' + clock_terminal data = np.zeros(npts, dtype=np.uint32) task.CreateDOChan(device_name + "/" + port, "", c.DAQmx_Val_ChanForAllLines) task.CfgSampClkTiming( clock_terminal_full, 100, c.DAQmx_Val_Rising, c.DAQmx_Val_FiniteSamps, npts ) written = int32() try: task.WriteDigitalU32( npts, False, 10.0, c.DAQmx_Val_GroupByScanNumber, data, byref(written), None ) except ( PyDAQmx.DAQmxFunctions.BufferedOperationsNotSupportedOnSelectedLinesError, PyDAQmx.DAQmxFunctions.PhysicalChanNotSupportedGivenSampTimingType653xError, ): return False except ( PyDAQmx.DAQmxFunctions.CantUsePort3AloneGivenSampTimingTypeOn653xError, PyDAQmx.DAQmxFunctions.CantUsePort1AloneGivenSampTimingTypeOn653xError, ): # Ports that throw this error on 653x devices do support buffered output, though # there are requirements that multiple ports be used together. return True except PyDAQmx.DAQmxFunctions.RouteNotSupportedByHW_RoutingError: # Try again with a different terminal current_terminal_index = all_terminals.index(clock_terminal) if current_terminal_index == len(all_terminals) - 1: # There are no more terminals. No terminals can be used as clocks, # therefore we cannot do externally clocked buffered output. return False next_terminal_to_try = all_terminals[current_terminal_index + 1] return port_supports_buffered(device_name, port, next_terminal_to_try) else: return True finally: task.ClearTask()
[docs] def AI_start_delay(device_name): """Empirically determines the analog inputs' start delay. Args: device_name (str): NI-MAX device name Returns: float: Analog input start delay in seconds. `None` if analog inputs not supported. """ if 'PFI0' not in DAQmxGetDevTerminals(device_name): return None task = Task() clock_terminal = '/' + device_name + '/PFI0' rate = DAQmxGetDevAIMaxSingleChanRate(device_name) Vmin, Vmax = DAQmxGetDevAIVoltageRngs(device_name)[0:2] num_samples = 1000 chan = device_name + '/ai0' supp_types = DAQmxGetPhysicalChanAITermCfgs(chan) if supp_types & c.DAQmx_Val_Bit_TermCfg_RSE: input_type = c.DAQmx_Val_RSE elif supp_types & c.DAQmx_Val_Bit_TermCfg_Diff: input_type = c.DAQmx_Val_Diff elif supp_types & c.DAQmx_Val_Bit_TermCfg_PseudoDIFF: input_type = c.DAQmx_Val_PseudoDiff task.CreateAIVoltageChan( chan, "", input_type, Vmin, Vmax, c.DAQmx_Val_Volts, None ) task.CfgSampClkTiming( "", rate, c.DAQmx_Val_Rising, c.DAQmx_Val_ContSamps, num_samples ) task.CfgDigEdgeStartTrig(clock_terminal, c.DAQmx_Val_Rising) start_trig_delay = float64() delay_from_sample_clock = float64() sample_timebase_rate = float64() try: task.GetStartTrigDelay(start_trig_delay) except PyDAQmx.DAQmxFunctions.AttributeNotSupportedInTaskContextError: # device does not have a Start Trigger Delay property # is likely a dynamic signal acquisition device with filter # delays instead. start_trig_delay.value = 0 try: task.GetDelayFromSampClkDelay(delay_from_sample_clock) except PyDAQmx.DAQmxFunctions.AttributeNotSupportedInTaskContextError: # seems simultaneous sampling devices do not have this property, # so assume it is zero delay_from_sample_clock.value = 0 task.GetSampClkTimebaseRate(sample_timebase_rate) task.ClearTask() total_delay_in_ticks = start_trig_delay.value + delay_from_sample_clock.value total_delay_in_seconds = total_delay_in_ticks / sample_timebase_rate.value return total_delay_in_seconds
[docs] def AI_filter_delay(device_name): """Determine the filter delay for dynamic signal acquistion devices. Returns the delay in clock cycles. Absolute delay will vary with sample rate. Args: device_name (str): NI-MAX device name Returns: int: Number of analog input delays ticks between task start and acquisition start. """ if 'PFI0' not in DAQmxGetDevTerminals(device_name): return None task = Task() clock_terminal = '/' + device_name + '/PFI0' rate = DAQmxGetDevAIMaxSingleChanRate(device_name) Vmin, Vmax = DAQmxGetDevAIVoltageRngs(device_name)[0:2] num_samples = 1000 chan = device_name + '/ai0' task.CreateAIVoltageChan( chan, "", c.DAQmx_Val_PseudoDiff, Vmin, Vmax, c.DAQmx_Val_Volts, None ) task.CfgSampClkTiming( "", rate, c.DAQmx_Val_Rising, c.DAQmx_Val_ContSamps, num_samples ) task.CfgDigEdgeStartTrig(clock_terminal, c.DAQmx_Val_Rising) start_filter_delay = float64() sample_timebase_rate = float64() # get delay in number of clock samples task.SetAIFilterDelayUnits("", c.DAQmx_Val_SampleClkPeriods) task.GetAIFilterDelay("", start_filter_delay) task.GetSampClkTimebaseRate(sample_timebase_rate) task.ClearTask() return int(start_filter_delay.value)
[docs] def supported_AI_terminal_configurations(device_name): """Determine which analong input configurations are supported for each AI. Valid options are RSE, NRSE, Diff, and PseudoDiff. Args: device_name (str): NI-MAX device name Returns: dict: Dictionary of analog input channels where each value is a list of the supported input terminations. """ supp_types = {} poss_types = {'RSE': c.DAQmx_Val_Bit_TermCfg_RSE, 'NRSE': c.DAQmx_Val_Bit_TermCfg_NRSE, 'Diff': c.DAQmx_Val_Bit_TermCfg_Diff, 'PseudoDiff': c.DAQmx_Val_Bit_TermCfg_PseudoDIFF} chans = DAQmxGetDevAIPhysicalChans(device_name) for chan in chans: byte = DAQmxGetPhysicalChanAITermCfgs(device_name+'/'+chan) chan_types = [key for key, val in poss_types.items() if val & byte] supp_types[chan] = chan_types return supp_types
[docs] def supported_AI_ranges_for_non_differential_input(device_name, AI_ranges): """Empirically determine the analog input voltage ranges for non-differential inputs. Tries AI ranges to see which are actually allowed for non-differential input, since the largest range may only be available for differential input. Args: device_name (str): NI-MAX device name AI_ranges (list): list of `[Vmin, Vmax]` pairs to check compatibility. Returns: list: List of lists with the supported voltage ranges. """ chan = device_name + '/ai0' supported_ranges = [] for Vmin, Vmax in AI_ranges: try: task = Task() task.CreateAIVoltageChan( chan, "", c.DAQmx_Val_RSE, Vmin, Vmax, c.DAQmx_Val_Volts, None ) task.StartTask() except PyDAQmx.DAQmxFunctions.InvalidAttributeValueError as e: if 'DAQmx_AI_Min' in e.message or 'DAQmx_AI_Max' in e.message: # Not supported for non-differential input: continue raise finally: task.ClearTask() supported_ranges.append([Vmin, Vmax]) return supported_ranges
[docs] def supports_semiperiod_measurement(device_name): """Empirically determines if the DAQ supports semiperiod measurement. Args: device_name (str): NI-MAX device name. Returns: bool: True if semi-period measurements are supported by the device. """ import warnings with warnings.catch_warnings(): # PyDAQmx warns about a positive return value, but actually this is how you are # supposed to figure out the size of the array required. warnings.simplefilter("ignore") # Pass in null pointer and 0 len to ask for what array size is needed: npts = PyDAQmx.DAQmxGetDevCISupportedMeasTypes(device_name, int32(), 0) # Create that array result = (int32 * npts)() PyDAQmx.DAQmxGetDevCISupportedMeasTypes(device_name, result, npts) return c.DAQmx_Val_SemiPeriod in [result[i] for i in range(npts)]
[docs] def get_min_semiperiod_measurement(device_name): """Determines the minimum semi-period measurement time supported by the device. Depending on the timebase used, counter inputs can measure time intervals of various ranges. As a default, we pick a largish range - the one with the fastest timebase still capable of measuring 100 seconds, or the largest time interval if it is less than 100 seconds, and we save the smallest interval measurable with this timebase. Then labscript can ensure it doesn't make wait monitor pulses shorter than this. This should be a sensible default behaviour, though if the user has experiments considerably shorter or longer than 100 seconds, such that they want to use a different timebase, they may pass the min_semiperiod_measurement keyword argument into the DAQmx class, to tell labscript to make pulses some other duration compatible with the maximum wait time in their experiment. However, since there are software delays in timeouts of waits during a shot, any timed-out waits necessarily will last software timescales of up to ~100ms on a slow computer, preventing one from using very fast timebases with low-resolution counters if there is any possibility of timing out. For now (in the wait monitor worker class) we pessimistically add one second to the expected longest measurement to account for software delays. These decisions can be revisited if there is a need, do not hesitate to file an issue on bitbucket regarding this if it affects you. Args: device_name (str): NI-MAX device name Returns: float: Minimum measurement time. """ CI_chans = DAQmxGetDevCIPhysicalChans(device_name) CI_chan = device_name + '/' + CI_chans[0] # Make a task with a semiperiod measurement task = Task() task.CreateCISemiPeriodChan(CI_chan, '', 1e-100, 1e100, c.DAQmx_Val_Seconds, "") try: task.StartTask() except PyDAQmx.DAQmxFunctions.CtrMinMaxError as e: # Parse the error to extract the allowed values: CI_ranges = [] DT_MIN_PREFIX = "Value Must Be Greater Than:" DT_MAX_PREFIX = "Value Must Be Less Than:" error_lines = e.message.splitlines() for line in error_lines: if DT_MIN_PREFIX in line: dt_min = float(line.replace(DT_MIN_PREFIX, '')) if DT_MAX_PREFIX in line: dt_max = float(line.replace(DT_MAX_PREFIX, '')) CI_ranges.append([dt_min, dt_max]) else: raise AssertionError("Can't figure out counter input ranges") finally: task.ClearTask() # Pick out the value we want. Either dtmin when dtmax is over 100, or the largest # dtmin if there is no dtmax over 100: for dtmin, dtmax in sorted(CI_ranges): if dtmax > 100: return dtmin return dtmin
if __name__ == '__main__': capabilities = {} if os.path.exists(CAPABILITIES_FILE): with open(CAPABILITIES_FILE) as f: try: capabilities = json.load(f) except ValueError: pass models = [] for name in DAQmxGetSysDevNames().split(', '): # ignore extra details in model names model = DAQmxGetDevProductType(name).split(' ')[0] print("found device:", name, model) if model not in models: models.append(model) capabilities[model] = {} try: capabilities[model]["supports_buffered_AO"] = DAQmxGetDevAOSampClkSupported( name ) except PyDAQmx.DAQmxFunctions.AttrNotSupportedError: capabilities[model]["supports_buffered_AO"] = False try: capabilities[model]["max_DO_sample_rate"] = DAQmxGetDevDOMaxRate(name) capabilities[model]["supports_buffered_DO"] = True except PyDAQmx.DAQmxFunctions.AttrNotSupportedError: capabilities[model]["max_DO_sample_rate"] = None capabilities[model]["supports_buffered_DO"] = False if capabilities[model]["supports_buffered_AO"]: capabilities[model]["max_AO_sample_rate"] = DAQmxGetDevAOMaxRate(name) else: capabilities[model]["max_AO_sample_rate"] = None capabilities[model]["num_AO"] = len(DAQmxGetDevAOPhysicalChans(name)) capabilities[model]["num_AI"] = len(DAQmxGetDevAIPhysicalChans(name)) if capabilities[model]["num_AI"] > 0: single_rate = DAQmxGetDevAIMaxSingleChanRate(name) multi_rate = DAQmxGetDevAIMaxMultiChanRate(name) else: single_rate = None multi_rate = None capabilities[model]["max_AI_single_chan_rate"] = single_rate capabilities[model]["max_AI_multi_chan_rate"] = multi_rate if capabilities[model]["num_AI"] > 0: capabilities[model]["AI_term_cfg"] = supported_AI_terminal_configurations(name) cfgs = [item for sublist in capabilities[model]["AI_term_cfg"].values() for item in sublist] if cfgs.count('RSE'): capabilities[model]["AI_term"] = 'RSE' elif cfgs.count('Diff'): capabilities[model]["AI_term"] = 'Diff' elif cfgs.count('PseudoDiff'): capabilities[model]["AI_term"] = 'PseudoDiff' capabilities[model]["supports_simultaneous_AI_sampling"] = DAQmxGetDevAISimultaneousSamplingSupported(name) capabilities[model]["ports"] = {} ports = DAQmxGetDevDOPorts(name) chans = DAQmxGetDevDOLines(name) for port in ports: if '_' in port: # Ignore the alternate port names such as 'port0_32' that allow using two or # more ports together as a single, larger one: continue port_info = {} capabilities[model]["ports"][port] = port_info port_chans = [chan for chan in chans if chan.split('/')[0] == port] port_info['num_lines'] = len(port_chans) if capabilities[model]["supports_buffered_DO"]: port_info['supports_buffered'] = port_supports_buffered(name, port) else: port_info['supports_buffered'] = False capabilities[model]["num_CI"] = len(DAQmxGetDevCIPhysicalChans(name)) supports_semiperiod = supports_semiperiod_measurement(name) capabilities[model]["supports_semiperiod_measurement"] = supports_semiperiod if capabilities[model]["num_CI"] > 0 and supports_semiperiod: min_semiperiod_measurement = get_min_semiperiod_measurement(name) else: min_semiperiod_measurement = None capabilities[model]["min_semiperiod_measurement"] = min_semiperiod_measurement if capabilities[model]['num_AO'] > 0: AO_ranges = [] raw_limits = DAQmxGetDevAOVoltageRngs(name) for i in range(0, len(raw_limits), 2): Vmin, Vmax = raw_limits[i], raw_limits[i + 1] AO_ranges.append([Vmin, Vmax]) # Find range with the largest maximum voltage and use that: Vmin, Vmax = max(AO_ranges, key=lambda range: range[1]) # Confirm that no other range has a voltage lower than Vmin, # since if it does, this violates our assumptions and things might not # be as simple as having a single range: assert min(AO_ranges)[0] >= Vmin capabilities[model]["AO_range"] = [Vmin, Vmax] else: capabilities[model]["AO_range"] = None if capabilities[model]['num_AI'] > 0: AI_ranges = [] raw_limits = DAQmxGetDevAIVoltageRngs(name) for i in range(0, len(raw_limits), 2): Vmin, Vmax = raw_limits[i], raw_limits[i + 1] AI_ranges.append([Vmin, Vmax]) # Find range with the largest maximum voltage and use that: Vmin_raw, Vmax_raw = max(AI_ranges, key=lambda range: range[1]) # Confirm that no other range has a voltage lower than Vmin, # since if it does, this violates our assumptions and things might not # be as simple as having a single range: assert min(AI_ranges)[0] >= Vmin_raw capabilities[model]["AI_range_Diff"] = [Vmin_raw, Vmax_raw] if 'RSE' in capabilities[model]["AI_term_cfg"]['ai0']: # Now limit to non-differential inputs (if available), which may have lower ranges AI_ranges = supported_AI_ranges_for_non_differential_input(name, AI_ranges) # Find RSE range with the largest maximum voltage and use that: Vmin, Vmax = max(AI_ranges, key=lambda range: range[1]) assert min(AI_ranges)[0] >= Vmin capabilities[model]["AI_range"] = [Vmin, Vmax] else: capabilities[model]["AI_range"] = None capabilities[model]["AI_range_Diff"] = None if capabilities[model]["num_AI"] > 0: if capabilities[model]["AI_term"] == 'PseudoDiff': capabilities[model]["AI_start_delay_ticks"] = AI_filter_delay(name) capabilities[model]["AI_start_delay"] = None else: capabilities[model]["AI_start_delay"] = AI_start_delay(name) else: capabilities[model]["AI_start_delay"] = None with open(CAPABILITIES_FILE, 'w', newline='\n') as f: data = json.dumps(capabilities, sort_keys=True, indent=4) f.write(data) print("added/updated capabilities for %d models" % len(models)) print("Total models with known capabilities: %d" % len(capabilities)) for model in capabilities: if model not in models: print(model, 'capabilities not updated') print("run generate_subclasses.py to make labscript devices for these models")