Source code for labscript.outputs

#####################################################################
#                                                                   #
# /outputs.py                                                       #
#                                                                   #
# Copyright 2013, Monash University                                 #
#                                                                   #
# This file is part of the program labscript, in the labscript      #
# suite (see http://labscriptsuite.org), and is licensed under the  #
# Simplified BSD License. See the license.txt file in the root of   #
# the project for the full license.                                 #
#                                                                   #
#####################################################################

"""Classes for devices channels that are outputs"""

import sys

import numpy as np

from . import functions
from .base import Device
from .compiler import compiler
from .constants import *
from .core import TriggerableDevice
from .utils import LabscriptError, set_passed_properties


[docs] class Output(Device): """Base class for all output classes.""" description = "generic output" allowed_states = None dtype = np.float64 scale_factor = 1
[docs] @set_passed_properties(property_names={}) def __init__( self, name, parent_device, connection, limits=None, unit_conversion_class=None, unit_conversion_parameters=None, default_value=None, **kwargs ): """Instantiate an Output. Args: name (str): python variable name to assign the Output to. parent_device (:obj:`IntermediateDevice`): Parent device the output is connected to. connection (str): Channel of parent device output is connected to. limits (tuple, optional): `(min,max)` allowed for the output. unit_conversion_class (:obj:`labscript_utils:labscript_utils.unitconversions`, optional): Unit concersion class to use for the output. unit_conversion_parameters (dict, optional): Dictonary or kwargs to pass to the unit conversion class. default_value (float, optional): Default value of the output if no output is commanded. **kwargs: Passed to :meth:`Device.__init__`. Raises: LabscriptError: Limits tuple is invalid or unit conversion class units don't line up. """ Device.__init__(self,name,parent_device,connection, **kwargs) self.instructions = {} self.ramp_limits = [] # For checking ramps don't overlap if default_value is not None: self.default_value = default_value if not unit_conversion_parameters: unit_conversion_parameters = {} self.unit_conversion_class = unit_conversion_class self.set_properties( unit_conversion_parameters, {"unit_conversion_parameters": list(unit_conversion_parameters.keys())} ) # Instantiate the calibration if unit_conversion_class is not None: self.calibration = unit_conversion_class(unit_conversion_parameters) # Validate the calibration class for units in self.calibration.derived_units: # Does the conversion to base units function exist for each defined unit # type? if not hasattr(self.calibration, f"{units}_to_base"): raise LabscriptError( f'The function "{units}_to_base" does not exist within the ' f'calibration "{self.unit_conversion_class}" used in output ' f'"{self.name}"' ) # Does the conversion to base units function exist for each defined unit # type? if not hasattr(self.calibration, f"{units}_from_base"): raise LabscriptError( f'The function "{units}_from_base" does not exist within the ' f'calibration "{self.unit_conversion_class}" used in output ' f'"{self.name}"' ) # If limits exist, check they are valid # Here we specifically differentiate "None" from False as we will later have a # conditional which relies on self.limits being either a correct tuple, or # "None" if limits is not None: if not isinstance(limits, tuple) or len(limits) != 2: raise LabscriptError( f'The limits for "{self.name}" must be tuple of length 2. ' 'Eg. limits=(1, 2)' ) if limits[0] > limits[1]: raise LabscriptError( "The first element of the tuple must be lower than the second " "element. Eg limits=(1, 2), NOT limits=(2, 1)" ) # Save limits even if they are None self.limits = limits
@property def clock_limit(self): """float: Returns the parent clock line's clock limit.""" parent = self.parent_clock_line return parent.clock_limit @property def trigger_delay(self): """float: The earliest time output can be commanded from this device after a trigger. This is nonzeo on secondary pseudoclocks due to triggering delays.""" parent = self.pseudoclock_device if parent.is_master_pseudoclock: return 0 else: return parent.trigger_delay @property def wait_delay(self): """float: The earliest time output can be commanded from this device after a wait. This is nonzeo on secondary pseudoclocks due to triggering delays and the fact that the master clock doesn't provide a resume trigger to secondary clocks until a minimum time has elapsed: compiler.wait_delay. This is so that if a wait is extremely short, the child clock is actually ready for the trigger. """ delay = compiler.wait_delay if self.pseudoclock_device.is_master_pseudoclock else 0 return self.trigger_delay + delay
[docs] def get_all_outputs(self): """Get all children devices that are outputs. For ``Output``, this is `self`. Returns: list: List of children :obj:`Output`. """ return [self]
[docs] def apply_calibration(self,value,units): """Apply the calibration defined by the unit conversion class, if present. Args: value (float): Value to apply calibration to. units (str): Units to convert to. Must be defined by the unit conversion class. Returns: float: Converted value. Raises: LabscriptError: If no unit conversion class is defined or `units` not in that class. """ # Is a calibration in use? if self.unit_conversion_class is None: raise LabscriptError( 'You can not specify the units in an instruction for output ' f'"{self.name}" as it does not have a calibration associated with it' ) # Does a calibration exist for the units specified? if units not in self.calibration.derived_units: raise LabscriptError( f'The units "{units}" does not exist within the calibration ' f'"{self.unit_conversion_class}" used in output "{self.name}"' ) # Return the calibrated value return getattr(self.calibration,units+"_to_base")(value)
[docs] def instruction_to_string(self,instruction): """Gets a human readable description of an instruction. Args: instruction (dict or str): Instruction to get description of, or a fixed instruction defined in :attr:`allowed_states`. Returns: str: Instruction description. """ if isinstance(instruction, dict): return instruction["description"] elif self.allowed_states: return str(self.allowed_states[instruction]) else: return str(instruction)
[docs] def add_instruction(self, time, instruction, units=None): """Adds a hardware instruction to the device instruction list. Args: time (float): Time, in seconds, that the instruction begins. instruction (dict or float): Instruction to add. units (str, optional): Units instruction is in, if `instruction` is a `float`. Raises: LabscriptError: If time requested is not allowed or samplerate is too fast. """ if not compiler.start_called: raise LabscriptError("Cannot add instructions prior to calling start()") # round to the nearest 0.1 nanoseconds, to prevent floating point # rounding errors from breaking our equality checks later on. time = round(time, 10) # Also round end time of ramps to the nearest 0.1 ns: if isinstance(instruction,dict): instruction["end time"] = round(instruction["end time"], 10) instruction["initial time"] = round(instruction["initial time"], 10) # Check that time is not negative or too soon after t=0: if time < self.t0: raise LabscriptError( f"{self.description} {self.name} has an instruction at t={time}s. " "Due to the delay in triggering its pseudoclock device, the earliest " f"output possible is at t={self.t0}." ) # Check that this doesn't collide with previous instructions: if time in self.instructions.keys(): if not compiler.suppress_all_warnings: current_value = self.instruction_to_string(self.instructions[time]) new_value = self.instruction_to_string( self.apply_calibration(instruction, units) if units and not isinstance(instruction, dict) else instruction ) sys.stderr.write( f"WARNING: State of {self.description} {self.name} at t={time}s " f"has already been set to {current_value}. Overwriting to " f"{new_value}. (note: all values in base units where relevant)" "\n" ) # Check that ramps don't collide if isinstance(instruction, dict): # No ramps allowed if this output is on a slow clock: if not self.parent_clock_line.ramping_allowed: raise LabscriptError( f"{self.description} {self.name} is on clockline that does not " "support ramping. It cannot have a function ramp as an instruction." ) for start, end in self.ramp_limits: if start < time < end or start < instruction["end time"] < end: start_value = self.instruction_to_string(self.instructions[start]) new_value = self.instruction_to_string(instruction) raise LabscriptError( f"State of {self.description} {self.name} from t = {start}s to " f"{end}s has already been set to {start_value}. Cannot set to " f"{new_value} from t = {time}s to {instruction['end time']}s." ) self.ramp_limits.append((time, instruction["end time"])) # Check that start time is before end time: if time > instruction["end time"]: raise LabscriptError( f"{self.description} {self.name} has been passed a function ramp " f"{self.instruction_to_string(instruction)} with a negative " "duration." ) if instruction["clock rate"] == 0: raise LabscriptError("A nonzero sample rate is required.") # Else we have a "constant", single valued instruction else: # If we have units specified, convert the value if units is not None: # Apply the unit calibration now instruction = self.apply_calibration(instruction, units) # if we have limits, check the value is valid if self.limits: if (instruction < self.limits[0]) or (instruction > self.limits[1]): raise LabscriptError( f"You cannot program the value {instruction} (base units) to " f"{self.name} as it falls outside the limits " f"({self.limits[0]} to {self.limits[1]})" ) self.instructions[time] = instruction
[docs] def do_checks(self, trigger_times): """Basic error checking to ensure the user's instructions make sense. Args: trigger_times (iterable): Times to confirm don't conflict with instructions. Raises: LabscriptError: If a trigger time conflicts with an instruction. """ # Check if there are no instructions. Generate a warning and insert an # instruction telling the output to remain at its default value. if not self.instructions: if not compiler.suppress_mild_warnings and not compiler.suppress_all_warnings: sys.stderr.write( f"WARNING: {self.name} has no instructions. It will be set to " f"{self.instruction_to_string(self.default_value)} for all time.\n" ) self.add_instruction(self.t0, self.default_value) # Check if there are no instructions at the initial time. Generate a warning and insert an # instruction telling the output to start at its default value. if self.t0 not in self.instructions.keys(): if not compiler.suppress_mild_warnings and not compiler.suppress_all_warnings: sys.stderr.write( f"WARNING: {self.name} has no initial instruction. It will " "initially be set to " f"{self.instruction_to_string(self.default_value)}.\n" ) self.add_instruction(self.t0, self.default_value) # Check that ramps have instructions following them. # If they don't, insert an instruction telling them to hold their final value. for instruction in list(self.instructions.values()): if ( isinstance(instruction, dict) and instruction["end time"] not in self.instructions.keys() ): self.add_instruction( instruction["end time"], instruction["function"]( instruction["end time"] - instruction["initial time"] ), instruction["units"], ) # Checks for trigger times: for trigger_time in trigger_times: for t, inst in self.instructions.items(): # Check no ramps are happening at the trigger time: if ( isinstance(inst, dict) and inst["initial time"] < trigger_time and inst["end time"] > trigger_time ): raise LabscriptError( f"{self.description} {self.name} has a ramp " f"{inst['description']} from t = {inst['initial time']} to " f"{inst['end time']}. This overlaps with a trigger at " f"t={trigger_time}, and so cannot be performed." ) # Check that nothing is happening during the delay time after the trigger: if ( round(trigger_time, 10) < round(t, 10) < round(trigger_time + self.trigger_delay, 10) ): raise LabscriptError( f"{self.description} {self.name} has an instruction at t={t}. " f"This is too soon after a trigger at t={trigger_time}, " "the earliest output possible after this trigger is at " f"t={trigger_time + self.trigger_delay}" ) # Check that there are no instructions too soon before the trigger: if ( t < trigger_time and max(self.clock_limit, compiler.wait_delay) < trigger_time - t ): raise LabscriptError( f"{self.description} {self.name} has an instruction at t={t}. " f"This is too soon before a trigger at t={trigger_time}, " "the latest output possible before this trigger is at " f"t={trigger_time - max(self.clock_limit, compiler.wait_delay)}" )
[docs] def offset_instructions_from_trigger(self, trigger_times): """Subtracts self.trigger_delay from all instructions at or after each trigger_time. Args: trigger_times (iterable): Times of all trigger events. """ offset_instructions = {} for t, instruction in self.instructions.items(): # How much of a delay is there for this instruction? That depends how many triggers there are prior to it: n_triggers_prior = len([time for time in trigger_times if time < t]) # The cumulative offset at this point in time: offset = self.trigger_delay * n_triggers_prior + trigger_times[0] offset = round(offset, 10) if isinstance(instruction, dict): offset_instruction = instruction.copy() offset_instruction["end time"] = self.quantise_to_pseudoclock( round(instruction["end time"] - offset, 10) ) offset_instruction["initial time"] = self.quantise_to_pseudoclock( round(instruction["initial time"] - offset, 10) ) else: offset_instruction = instruction new_time = self.quantise_to_pseudoclock(round(t - offset, 10)) offset_instructions[new_time] = offset_instruction self.instructions = offset_instructions # offset each of the ramp_limits for use in the calculation within # Pseudoclock/ClockLine so that the times in list are consistent with the ones # in self.instructions for i, times in enumerate(self.ramp_limits): n_triggers_prior = len([time for time in trigger_times if time < times[0]]) # The cumulative offset at this point in time: offset = self.trigger_delay * n_triggers_prior + trigger_times[0] offset = round(offset, 10) # offset start and end time of ramps # NOTE: This assumes ramps cannot proceed across a trigger command # (for instance you cannot ramp an output across a WAIT) self.ramp_limits[i] = ( self.quantise_to_pseudoclock(round(times[0] - offset, 10)), self.quantise_to_pseudoclock(round(times[1] - offset, 10)), )
[docs] def get_change_times(self): """If this function is being called, it means that the parent Pseudoclock has requested a list of times that this output changes state. Returns: list: List of times output changes values. """ times = list(self.instructions.keys()) times.sort() current_dict_time = None for time in times: if isinstance(self.instructions[time], dict) and current_dict_time is None: current_dict_time = self.instructions[time] elif ( current_dict_time is not None and current_dict_time['initial time'] < time < current_dict_time['end time'] ): raise LabscriptError( f"{self.description} {self.name} has an instruction at " f"t={time:.10f}s. This instruction collides with a ramp on this " "output at that time. The collision " f"{current_dict_time['description']} is happening from " f"{current_dict_time['initial time']:.10f}s untill " f"{current_dict_time['end time']:.10f}s" ) self.times = times return times
[docs] def get_ramp_times(self): """If this is being called, then it means the parent Pseuedoclock has asked for a list of the output ramp start and stop times. Returns: list: List of (start, stop) times of ramps for this Output. """ return self.ramp_limits
[docs] def make_timeseries(self, change_times): """If this is being called, then it means the parent Pseudoclock has asked for a list of this output's states at each time in change_times. (Which are the times that one or more connected outputs in the same pseudoclock change state). By state, I don't mean the value of the output at that moment, rather I mean what instruction it has. This might be a single value, or it might be a reference to a function for a ramp etc. This list of states is stored in self.timeseries rather than being returned.""" self.timeseries = [] i = 0 time_len = len(self.times) for change_time in change_times: while i < time_len and change_time >= self.times[i]: i += 1 self.timeseries.append(self.instructions[self.times[i-1]])
[docs] def expand_timeseries(self,all_times,flat_all_times_len): """This function evaluates the ramp functions in self.timeseries at the time points in all_times, and creates an array of output values at those times. These are the values that this output should update to on each clock tick, and are the raw values that should be used to program the output device. They are stored in self.raw_output.""" # If this output is not ramping, then its timeseries should # not be expanded. It's already as expanded as it'll get. if not self.parent_clock_line.ramping_allowed: self.raw_output = np.array(self.timeseries, dtype=np.dtype(self.dtype)) return outputarray = np.empty((flat_all_times_len,), dtype=np.dtype(self.dtype)) j = 0 for i, time in enumerate(all_times): if np.iterable(time): time_len = len(time) if isinstance(self.timeseries[i], dict): # We evaluate the functions at the midpoints of the # timesteps in order to remove the zero-order hold # error introduced by sampling an analog signal: try: midpoints = time + 0.5*(time[1] - time[0]) except IndexError: # Time array might be only one element long, so we # can't calculate the step size this way. That's # ok, the final midpoint is determined differently # anyway: midpoints = np.zeros(1) # We need to know when the first clock tick is after # this ramp ends. It's either an array element or a # single number depending on if this ramp is followed # by another ramp or not: next_time = all_times[i+1][0] if np.iterable(all_times[i+1]) else all_times[i+1] midpoints[-1] = time[-1] + 0.5*(next_time - time[-1]) outarray = self.timeseries[i]["function"]( midpoints - self.timeseries[i]["initial time"] ) # Now that we have the list of output points, pass them through the unit calibration if self.timeseries[i]["units"] is not None: outarray = self.apply_calibration( outarray, self.timeseries[i]["units"] ) # if we have limits, check the value is valid if self.limits: if ((outarray<self.limits[0])|(outarray>self.limits[1])).any(): raise LabscriptError( f"The function {self.timeseries[i]['function']} called " f'on "{self.name}" at t={midpoints[0]} generated a ' "value which falls outside the base unit limits " f"({self.limits[0]} to {self.limits[1]})" ) else: outarray = np.empty(time_len, dtype=self.dtype) outarray.fill(self.timeseries[i]) outputarray[j:j+time_len] = outarray j += time_len else: outputarray[j] = self.timeseries[i] j += 1 del self.timeseries # don't need this any more. self.raw_output = outputarray
[docs] class AnalogQuantity(Output): """Base class for :obj:`AnalogOut`. It is also used internally by :obj:`DDS`. You should never instantiate this class directly. """ description = "analog quantity" default_value = 0 def _check_truncation(self, truncation, min=0, max=1): if not (min <= truncation <= max): raise LabscriptError( f"Truncation argument must be between {min} and {max} (inclusive), but " f"is {truncation}." )
[docs] def ramp(self, t, duration, initial, final, samplerate, units=None, truncation=1.): """Command the output to perform a linear ramp. Defined by `f(t) = ((final - initial)/duration)*t + initial` Args: t (float): Time, in seconds, to begin the ramp. duration (float): Length, in seconds, of the ramp. initial (float): Initial output value, at time `t`. final (float): Final output value, at time `t+duration`. samplerate (float): Rate, in Hz, to update the output. units: Units the output values are given in, as specified by the unit conversion class. truncation (float, optional): Fraction of ramp to perform. Must be between 0 and 1. Returns: float: Length of time ramp will take to complete. """ self._check_truncation(truncation) if truncation > 0: # if start and end value are the same, we don't need to ramp and can save # the sample ticks etc if initial == final: self.constant(t, initial, units) if not compiler.suppress_mild_warnings and not compiler.suppress_all_warnings: sys.stderr.write( f"WARNING: {self.__class__.__name__} '{self.name}' has the " f"same initial and final value at time t={t:.10f}s with " f"duration {duration:.10f}s. In order to save samples and " "clock ticks this instruction is replaced with a constant " "output.\n" ) else: self.add_instruction( t, { "function": functions.ramp( round(t + duration, 10) - round(t, 10), initial, final ), "description": "linear ramp", "initial time": t, "end time": t + truncation * duration, "clock rate": samplerate, "units": units, } ) return truncation * duration
[docs] def sine( self, t, duration, amplitude, angfreq, phase, dc_offset, samplerate, units=None, truncation=1. ): """Command the output to perform a sinusoidal modulation. Defined by `f(t) = amplitude*sin(angfreq*t + phase) + dc_offset` Args: t (float): Time, in seconds, to begin the ramp. duration (float): Length, in seconds, of the ramp. amplitude (float): Amplitude of the modulation. angfreq (float): Angular frequency, in radians per second. phase (float): Phase offset of the sine wave, in radians. dc_offset (float): DC offset of output away from 0. samplerate (float): Rate, in Hz, to update the output. units: Units the output values are given in, as specified by the unit conversion class. truncation (float, optional): Fraction of duration to perform. Must be between 0 and 1. Returns: float: Length of time modulation will take to complete. Equivalent to `truncation*duration`. """ self._check_truncation(truncation) if truncation > 0: self.add_instruction( t, { "function": functions.sine( round(t + duration, 10) - round(t, 10), amplitude, angfreq, phase, dc_offset, ), "description": "sine wave", "initial time": t, "end time": t + truncation*duration, "clock rate": samplerate, "units": units, } ) return truncation*duration
[docs] def sine_ramp( self, t, duration, initial, final, samplerate, units=None, truncation=1. ): """Command the output to perform a ramp defined by one half period of a squared sine wave. Defined by `f(t) = (final-initial)*(sin(pi*t/(2*duration)))^2 + initial` Args: t (float): Time, in seconds, to begin the ramp. duration (float): Length, in seconds, of the ramp. initial (float): Initial output value, at time `t`. final (float): Final output value, at time `t+duration`. samplerate (float): Rate, in Hz, to update the output. units: Units the output values are given in, as specified by the unit conversion class. truncation (float, optional): Fraction of ramp to perform. Must be between 0 and 1. Returns: float: Length of time ramp will take to complete. """ self._check_truncation(truncation) if truncation > 0: self.add_instruction( t, { "function": functions.sine_ramp( round(t + duration, 10) - round(t, 10), initial, final ), "description": "sinusoidal ramp", "initial time": t, "end time": t + truncation*duration, "clock rate": samplerate, "units": units, } ) return truncation*duration
[docs] def sine4_ramp( self, t, duration, initial, final, samplerate, units=None, truncation=1. ): """Command the output to perform an increasing ramp defined by one half period of a quartic sine wave. Defined by `f(t) = (final-initial)*(sin(pi*t/(2*duration)))^4 + initial` Args: t (float): Time, in seconds, to begin the ramp. duration (float): Length, in seconds, of the ramp. initial (float): Initial output value, at time `t`. final (float): Final output value, at time `t+duration`. samplerate (float): Rate, in Hz, to update the output. units: Units the output values are given in, as specified by the unit conversion class. truncation (float, optional): Fraction of ramp to perform. Must be between 0 and 1. Returns: float: Length of time ramp will take to complete. """ self._check_truncation(truncation) if truncation > 0: self.add_instruction( t, { "function": functions.sine4_ramp( round(t + duration, 10) - round(t, 10), initial, final ), "description": "sinusoidal ramp", "initial time": t, "end time": t + truncation*duration, "clock rate": samplerate, "units": units, } ) return truncation*duration
[docs] def sine4_reverse_ramp( self, t, duration, initial, final, samplerate, units=None, truncation=1. ): """Command the output to perform a decreasing ramp defined by one half period of a quartic sine wave. Defined by `f(t) = (final-initial)*(sin(pi*t/(2*duration)))^4 + initial` Args: t (float): Time, in seconds, to begin the ramp. duration (float): Length, in seconds, of the ramp. initial (float): Initial output value, at time `t`. final (float): Final output value, at time `t+duration`. samplerate (float): Rate, in Hz, to update the output. units: Units the output values are given in, as specified by the unit conversion class. truncation (float, optional): Fraction of ramp to perform. Must be between 0 and 1. Returns: float: Length of time ramp will take to complete. """ self._check_truncation(truncation) if truncation > 0: self.add_instruction( t, { "function": functions.sine4_reverse_ramp( round(t + duration, 10) - round(t, 10), initial, final ), "description": "sinusoidal ramp", "initial time": t, "end time": t + truncation*duration, "clock rate": samplerate, "units": units, } ) return truncation*duration
[docs] def exp_ramp( self, t, duration, initial, final, samplerate, zero=0, units=None, truncation=None, truncation_type="linear", **kwargs, ): """Exponential ramp whose rate of change is set by an asymptotic value (zero argument). Args: t (float): time to start the ramp duration (float): duration of the ramp initial (float): initial value of the ramp (sans truncation) final (float): final value of the ramp (sans truncation) zero (float): asymptotic value of the exponential decay/rise, i.e. limit as t --> inf samplerate (float): rate to sample the function units: unit conversion to apply to specified values before generating raw output truncation_type (str): * `'linear'` truncation stops the ramp when it reaches the value given by the truncation parameter, which must be between initial and final * `'exponential'` truncation stops the ramp after a period of truncation*duration In this instance, the truncation parameter should be between 0 (full truncation) and 1 (no truncation). """ # Backwards compatibility for old kwarg names if "trunc" in kwargs: truncation = kwargs.pop("trunc") if "trunc_type" in kwargs: truncation_type = kwargs.pop("trunc_type") if truncation is not None: # Computed the truncated duration based on the truncation_type if truncation_type == "linear": self._check_truncation( truncation, min(initial, final), max(initial, final) ) # Truncate the ramp when it reaches the value truncation trunc_duration = duration * \ np.log((initial-zero)/(truncation-zero)) / \ np.log((initial-zero)/(final-zero)) elif truncation_type == "exponential": # Truncate the ramps duration by a fraction truncation self._check_truncation(truncation) trunc_duration = truncation * duration else: raise LabscriptError( "Truncation type for exp_ramp not supported. Must be either linear " "or exponential." ) else: trunc_duration = duration if trunc_duration > 0: self.add_instruction( t, { "function": functions.exp_ramp( round(t + duration, 10) - round(t, 10), initial, final, zero ), "description": 'exponential ramp', "initial time": t, "end time": t + trunc_duration, "clock rate": samplerate, "units": units, } ) return trunc_duration
[docs] def exp_ramp_t( self, t, duration, initial, final, time_constant, samplerate, units=None, truncation=None, truncation_type="linear", **kwargs ): """Exponential ramp whose rate of change is set by the time_constant. Args: t (float): time to start the ramp duration (float): duration of the ramp initial (float): initial value of the ramp (sans truncation) final (float): final value of the ramp (sans truncation) time_constant (float): 1/e time of the exponential decay/rise samplerate (float): rate to sample the function units: unit conversion to apply to specified values before generating raw output truncation_type (str): * `'linear'` truncation stops the ramp when it reaches the value given by the truncation parameter, which must be between initial and final * `'exponential'` truncation stops the ramp after a period of truncation*duration In this instance, the truncation parameter should be between 0 (full truncation) and 1 (no truncation). """ # Backwards compatibility for old kwarg names if "trunc" in kwargs: truncation = kwargs.pop("trunc") if "trunc_type" in kwargs: truncation_type = kwargs.pop("trunc_type") if truncation is not None: zero = (final-initial*np.exp(-duration/time_constant)) / \ (1-np.exp(-duration/time_constant)) if truncation_type == "linear": self._check_truncation(truncation, min(initial, final), max(initial, final)) trunc_duration = time_constant * \ np.log((initial-zero)/(truncation-zero)) elif truncation_type == 'exponential': self._check_truncation(truncation) trunc_duration = truncation * duration else: raise LabscriptError( "Truncation type for exp_ramp_t not supported. Must be either " "linear or exponential." ) else: trunc_duration = duration if trunc_duration > 0: self.add_instruction( t, { "function": functions.exp_ramp_t( round(t + duration, 10) - round(t, 10), initial, final, time_constant, ), "description": "exponential ramp with time consntant", "initial time": t, "end time": t + trunc_duration, "clock rate": samplerate, "units": units, } ) return trunc_duration
[docs] def piecewise_accel_ramp( self, t, duration, initial, final, samplerate, units=None, truncation=1. ): """Changes the output so that the second derivative follows one period of a triangle wave. Args: t (float): Time, in seconds, at which to begin the ramp. duration (float): Duration of the ramp, in seconds. initial (float): Initial output value at time `t`. final (float): Final output value at time `t+duration`. samplerate (float): Update rate of the output, in Hz. units: Units, defined by the unit conversion class, the value is in. truncation (float, optional): Fraction of ramp to perform. Default 1.0. Returns: float: Time the ramp will take to complete. """ self._check_truncation(truncation) if truncation > 0: self.add_instruction( t, { "function": functions.piecewise_accel( round(t + duration, 10) - round(t, 10), initial, final ), "description": "piecewise linear accelleration ramp", "initial time": t, "end time": t + truncation*duration, "clock rate": samplerate, "units": units, } ) return truncation*duration
[docs] def square_wave( self, t, duration, amplitude, frequency, phase, offset, duty_cycle, samplerate, units=None, truncation=1. ): """A standard square wave. This method generates a square wave which starts HIGH (when its phase is zero) then transitions to/from LOW at the specified `frequency` in Hz. The `amplitude` parameter specifies the peak-to-peak amplitude of the square wave which is centered around `offset`. For example, setting `amplitude=1` and `offset=0` would give a square wave which transitions between `0.5` and `-0.5`. Similarly, setting `amplitude=2` and `offset=3` would give a square wave which transitions between `4` and `2`. To instead specify the HIGH/LOW levels directly, use `square_wave_levels()`. Note that because the transitions of a square wave are sudden and discontinuous, small changes in timings (e.g. due to numerical rounding errors) can affect the output value. This is particularly relevant at the end of the waveform, as the final output value may be different than expected if the end of the waveform is close to an edge of the square wave. Care is taken in the implementation of this method to avoid such effects, but it still may be desirable to call `constant()` after `square_wave()` to ensure a particular final value. The output value may also be different than expected at certain moments in the middle of the waveform due to the finite samplerate (which may be different than the requested `samplerate`), particularly if the actual samplerate is not a multiple of `frequency`. Args: t (float): The time at which to start the square wave. duration (float): The duration for which to output a square wave when `truncation` is set to `1`. When `truncation` is set to a value less than `1`, the actual duration will be shorter than `duration` by that factor. amplitude (float): The peak-to-peak amplitude of the square wave. See above for an example of how to calculate the HIGH/LOW output values given the `amplitude` and `offset` values. frequency (float): The frequency of the square wave, in Hz. phase (float): The initial phase of the square wave. Note that the square wave is defined such that the phase goes from 0 to 1 (NOT 2 pi) over one cycle, so setting `phase=0.5` will start the square wave advanced by 1/2 of a cycle. Setting `phase` equal to `duty_cycle` will cause the waveform to start LOW rather than HIGH. offset (float): The offset of the square wave, which is the value halfway between the LOW and HIGH output values. Note that this is NOT the LOW output value; setting `offset` to `0` will cause the HIGH/LOW values to be symmetrically split around `0`. See above for an example of how to calculate the HIGH/LOW output values given the `amplitude` and `offset` values. duty_cycle (float): The fraction of the cycle for which the output should be HIGH. This should be a number between zero and one inclusively. For example, setting `duty_cycle=0.1` will create a square wave which outputs HIGH over 10% of the cycle and outputs LOW over 90% of the cycle. samplerate (float): The requested rate at which to update the output value. Note that the actual samplerate used may be different if, for example, another output of the same device has a simultaneous ramp with a different requested `samplerate`, or if `1 / samplerate` isn't an integer multiple of the pseudoclock's timing resolution. units (str, optional): The units of the output values. If set to `None` then the output's base units will be used. Defaults to `None`. truncation (float, optional): The actual duration of the square wave will be `duration * truncation` and `truncation` must be set to a value in the range [0, 1] (inclusively). Set to `1` to output the full duration of the square wave. Setting it to `0` will skip the square wave entirely. Defaults to `1.`. Returns: duration (float): The actual duration of the square wave, accounting for `truncation`. """ # Convert to values used by square_wave_levels, then call that method. level_0 = offset + 0.5 * amplitude level_1 = offset - 0.5 * amplitude return self.square_wave_levels( t, duration, level_0, level_1, frequency, phase, duty_cycle, samplerate, units, truncation, )
[docs] def square_wave_levels( self, t, duration, level_0, level_1, frequency, phase, duty_cycle, samplerate, units=None, truncation=1. ): """A standard square wave. This method generates a square wave which starts at `level_0` (when its phase is zero) then transitions to/from `level_1` at the specified `frequency`. This is the same waveform output by `square_wave()`, but parameterized differently. See that method's docstring for more information. Args: t (float): The time at which to start the square wave. duration (float): The duration for which to output a square wave when `truncation` is set to `1`. When `truncation` is set to a value less than `1`, the actual duration will be shorter than `duration` by that factor. level_0 (float): The initial level of the square wave, when the phase is zero. level_1 (float): The other level of the square wave. frequency (float): The frequency of the square wave, in Hz. phase (float): The initial phase of the square wave. Note that the square wave is defined such that the phase goes from 0 to 1 (NOT 2 pi) over one cycle, so setting `phase=0.5` will start the square wave advanced by 1/2 of a cycle. Setting `phase` equal to `duty_cycle` will cause the waveform to start at `level_1` rather than `level_0`. duty_cycle (float): The fraction of the cycle for which the output should be set to `level_0`. This should be a number between zero and one inclusively. For example, setting `duty_cycle=0.1` will create a square wave which outputs `level_0` over 10% of the cycle and outputs `level_1` over 90% of the cycle. samplerate (float): The requested rate at which to update the output value. Note that the actual samplerate used may be different if, for example, another output of the same device has a simultaneous ramp with a different requested `samplerate`, or if `1 / samplerate` isn't an integer multiple of the pseudoclock's timing resolution. units (str, optional): The units of the output values. If set to `None` then the output's base units will be used. Defaults to `None`. truncation (float, optional): The actual duration of the square wave will be `duration * truncation` and `truncation` must be set to a value in the range [0, 1] (inclusively). Set to `1` to output the full duration of the square wave. Setting it to `0` will skip the square wave entirely. Defaults to `1.`. Returns: duration (float): The actual duration of the square wave, accounting for `truncation`. """ # Check the argument values. self._check_truncation(truncation) if duty_cycle < 0 or duty_cycle > 1: raise LabscriptError( "Square wave duty cycle must be in the range [0, 1] (inclusively) but " f"was set to {duty_cycle}." ) if truncation > 0: # Add the instruction. func = functions.square_wave( round(t + duration, 10) - round(t, 10), level_0, level_1, frequency, phase, duty_cycle, ) self.add_instruction( t, { "function": func, "description": "square wave", "initial time": t, "end time": t + truncation * duration, "clock rate": samplerate, "units": units, } ) return truncation * duration
[docs] def customramp(self, t, duration, function, *args, **kwargs): """Define a custom function for the output. Args: t (float): Time, in seconds, to start the function. duration (float): Length in time, in seconds, to perform the function. function (func): Function handle that defines the output waveform. First argument is the relative time from function start, in seconds. *args: Arguments passed to `function`. **kwargs: Keyword arguments pass to `function`. Standard kwargs common to other output functions are: `units`, `samplerate`, and `truncation`. These kwargs are optional, but will not be passed to `function` if present. Returns: float: Duration the function is to be evaluate for. Equivalent to `truncation*duration`. """ units = kwargs.pop("units", None) samplerate = kwargs.pop("samplerate") truncation = kwargs.pop("truncation", 1.) self._check_truncation(truncation) def custom_ramp_func(t_rel): """The function that will return the result of the user's function, evaluated at relative times t_rel from 0 to duration""" return function( t_rel, round(t + duration, 10) - round(t, 10), *args, **kwargs ) if truncation > 0: self.add_instruction( t, { "function": custom_ramp_func, "description": f"custom ramp: {function.__name__}", "initial time": t, "end time": t + truncation*duration, "clock rate": samplerate, "units": units, } ) return truncation*duration
[docs] def constant(self, t, value, units=None): """Sets the output to a constant value at time `t`. Args: t (float): Time, in seconds, to set the constant output. value (float): Value to set. units: Units, defined by the unit conversion class, the value is in. """ # verify that value can be converted to float try: val = float(value) except: raise LabscriptError( f"Cannot set {self.name} to value={value} at t={t} as the value cannot " "be converted to float" ) self.add_instruction(t, value, units)
[docs] class AnalogOut(AnalogQuantity): """Analog Output class for use with all devices that support timed analog outputs.""" description = "analog output"
[docs] class StaticAnalogQuantity(Output): """Base class for :obj:`StaticAnalogOut`. It can also be used internally by other more complex output types. """ description = "static analog quantity" default_value = 0.0 """float: Value of output if no constant value is commanded."""
[docs] @set_passed_properties(property_names = {}) def __init__(self, *args, **kwargs): """Instatiantes the static analog quantity. Defines an internal tracking variable of the static output value and calls :func:`Output.__init__`. Args: *args: Passed to :func:`Output.__init__`. **kwargs: Passed to :func:`Output.__init__`. """ Output.__init__(self, *args, **kwargs) self._static_value = None
[docs] def constant(self, value, units=None): """Set the static output value of the output. Args: value (float): Value to set the output to. units: Units, defined by the unit conversion class, the value is in. Raises: LabscriptError: If static output has already been set to another value or the value lies outside the output limits. """ if self._static_value is None: # If we have units specified, convert the value if units is not None: # Apply the unit calibration now value = self.apply_calibration(value, units) # if we have limits, check the value is valid if self.limits: minval, maxval = self.limits if not minval <= value <= maxval: raise LabscriptError( f"You cannot program the value {value} (base units) to " f"{self.name} as it falls outside the limits " f"({self.limits[0]} to {self.limits[1]})" ) self._static_value = value else: raise LabscriptError( f"{self.description} {self.name} has already been set to " f"{self._static_value} (base units). It cannot also be set to " f"{value} ({units if units is not None else 'base units'})." )
[docs] def get_change_times(self): """Enforces no change times. Returns: list: An empty list, as expected by the parent pseudoclock. """ # Return an empty list as the calling function at the pseudoclock level expects # a list return []
[docs] def make_timeseries(self,change_times): """Since output is static, does nothing.""" pass
[docs] def expand_timeseries(self,*args,**kwargs): """Defines the `raw_output` attribute. """ self.raw_output = np.array([self.static_value], dtype=self.dtype)
@property def static_value(self): """float: The value of the static output.""" if self._static_value is None: if not compiler.suppress_mild_warnings and not compiler.suppress_all_warnings: sys.stderr.write( f"WARNING: {self.name} has no value set. It will be set to " f"{self.instruction_to_string(self.default_value)}.\n" ) self._static_value = self.default_value return self._static_value
[docs] class StaticAnalogOut(StaticAnalogQuantity): """Static Analog Output class for use with all devices that have constant outputs.""" description = "static analog output"
[docs] class DigitalQuantity(Output): """Base class for :obj:`DigitalOut`. It is also used internally by other, more complex, output types. """ description = "digital quantity" allowed_states = {1: "high", 0: "low"} default_value = 0 dtype = np.uint32 # Redefine __init__ so that you cannot define a limit or calibration for DO
[docs] @set_passed_properties(property_names={"connection_table_properties": ["inverted"]}) def __init__(self, name, parent_device, connection, inverted=False, **kwargs): """Instantiate a digital quantity. Args: name (str): python variable name to assign the quantity to. parent_device (:obj:`IntermediateDevice`): Device this quantity is attached to. connection (str): Connection on parent device we are connected to. inverted (bool, optional): If `True`, output is logic inverted. **kwargs: Passed to :func:`Output.__init__`. """ Output.__init__(self,name,parent_device,connection, **kwargs) self.inverted = bool(inverted)
[docs] def go_high(self, t): """Commands the output to go high. Args: t (float): Time, in seconds, when the output goes high. """ self.add_instruction(t, 1)
[docs] def go_low(self, t): """Commands the output to go low. Args: t (float): Time, in seconds, when the output goes low. """ self.add_instruction(t, 0)
[docs] def enable(self, t): """Commands the output to enable. If `inverted=True`, this will set the output low. Args: t (float): Time, in seconds, when the output enables. """ if self.inverted: self.go_low(t) else: self.go_high(t)
[docs] def disable(self, t): """Commands the output to disable. If `inverted=True`, this will set the output high. Args: t (float): Time, in seconds, when the output disables. """ if self.inverted: self.go_high(t) else: self.go_low(t)
[docs] def repeat_pulse_sequence(self, t, duration, pulse_sequence, period, samplerate): """This function only works if the DigitalQuantity is on a fast clock The pulse sequence specified will be repeated from time t until t+duration. Note 1: The samplerate should be significantly faster than the smallest time difference between two states in the pulse sequence, or else points in your pulse sequence may never be evaluated. Note 2: The time points your pulse sequence is evaluated at may be different than you expect, if another output changes state between t and t+duration. As such, you should set the samplerate high enough that even if this rounding of tie points occurs (to fit in the update required to change the other output) your pulse sequence will not be significantly altered) Args: t (float): Time, in seconds, to start the pulse sequence. duration (float): How long, in seconds, to repeat the sequence. pulse_sequence (list): List of tuples, with each tuple of the form `(time, state)`. period (float): Defines how long the final tuple will be held for before repeating the pulse sequence. In general, should be longer than the entire pulse sequence. samplerate (float): How often to update the output, in Hz. """ self.add_instruction( t, { "function": functions.pulse_sequence(pulse_sequence, period), "description": "pulse sequence", "initial time":t, "end time": t + duration, "clock rate": samplerate, "units": None, } ) return duration
[docs] class DigitalOut(DigitalQuantity): """Digital output class for use with all devices.""" description = "digital output"
[docs] class StaticDigitalQuantity(DigitalQuantity): """Base class for :obj:`StaticDigitalOut`. It can also be used internally by other, more complex, output types. """ description = "static digital quantity" default_value = 0 """float: Value of output if no constant value is commanded."""
[docs] @set_passed_properties(property_names = {}) def __init__(self, *args, **kwargs): """Instatiantes the static digital quantity. Defines an internal tracking variable of the static output value and calls :func:`Output.__init__`. Args: *args: Passed to :func:`Output.__init__`. **kwargs: Passed to :func:`Output.__init__`. """ DigitalQuantity.__init__(self, *args, **kwargs) self._static_value = None
[docs] def go_high(self): """Command a static high output. Raises: LabscriptError: If output has already been set low. """ if self._static_value is None: self.add_instruction(0, 1) self._static_value = 1 else: raise LabscriptError( f"{self.description} {self.name} has already been set to " f"{self.instruction_to_string(self._static_value)}. It cannot " "also be set to 1." )
[docs] def go_low(self): """Command a static low output. Raises: LabscriptError: If output has already been set high. """ if self._static_value is None: self.add_instruction(0, 0) self._static_value = 0 else: raise LabscriptError( f"{self.description} {self.name} has already been set to " f"{self.instruction_to_string(self._static_value)}. It cannot " "also be set to 0." )
[docs] def get_change_times(self): """Enforces no change times. Returns: list: An empty list, as expected by the parent pseudoclock. """ # Return an empty list as the calling function at the pseudoclock level expects # a list return []
[docs] def make_timeseries(self, change_times): """Since output is static, does nothing.""" pass
[docs] def expand_timeseries(self, *args, **kwargs): """Defines the `raw_output` attribute. """ self.raw_output = np.array([self.static_value], dtype=self.dtype)
@property def static_value(self): """float: The value of the static output.""" if self._static_value is None: if not compiler.suppress_mild_warnings and not compiler.suppress_all_warnings: sys.stderr.write( f"WARNING: {self.name} has no value set. It will be set to " f"{self.instruction_to_string(self.default_value)}.\n" ) self._static_value = self.default_value return self._static_value
[docs] class StaticDigitalOut(StaticDigitalQuantity): """Static Digital Output class for use with all devices that have constant outputs.""" description = "static digital output"
[docs] class Shutter(DigitalOut): """Customized version of :obj:`DigitalOut` that accounts for the open/close delay of a shutter automatically. When using the methods :meth:`open` and :meth:`close`, the shutter open and close times are precise without haveing to track the delays. Note: delays can be set using runmanager globals and periodically updated via a calibration. .. Warning:: If the shutter is asked to do something at `t=0`, it cannot start moving earlier than that. This means the initial shutter states will have imprecise timing. """ description = "shutter"
[docs] @set_passed_properties( property_names={"connection_table_properties": ["open_state"]} ) def __init__( self, name, parent_device, connection, delay=(0, 0), open_state=1, **kwargs ): """Instantiates a Shutter. Args: name (str): python variable to assign the object to. parent_device (:obj:`IntermediateDevice`): Parent device the digital output is connected to. connection (str): Physical output port of the device the digital output is connected to. delay (tuple, optional): Tuple of the (open, close) delays, specified in seconds. open_state (int, optional): Allowed values are `0` or `1`. Defines which state of the digital output opens the shutter. Raises: LabscriptError: If the `open_state` is not `0` or `1`. """ inverted = not bool(open_state) DigitalOut.__init__( self, name, parent_device, connection, inverted=inverted, **kwargs ) self.open_delay, self.close_delay = delay self.open_state = open_state if self.open_state == 1: self.allowed_states = {0: "closed", 1: "open"} elif self.open_state == 0: self.allowed_states = {1: "closed", 0: "open"} else: raise LabscriptError( f"Shutter {self.name} wasn't instantiated with open_state = 0 or 1." ) self.actual_times = {}
[docs] def open(self, t): """Command the shutter to open at time `t`. Takes the open delay time into account. Note that the delay time will not be take into account the open delay if the command is made at t=0 (or other times less than the open delay). No warning will be issued for this loss of precision during compilation. Args: t (float): Time, in seconds, when shutter should be open. """ # If a shutter is asked to do something at t=0, it cannot start moving # earlier than that. So initial shutter states will have imprecise # timing. Not throwing a warning here because if I did, every run # would throw a warning for every shutter. The documentation will # have to make a point of this. t_calc = t-self.open_delay if t >= self.open_delay else 0 self.actual_times[t] = {"time": t_calc, "instruction": 1} self.enable(t_calc)
[docs] def close(self, t): """Command the shutter to close at time `t`. Takes the close delay time into account. Note that the delay time will not be take into account the close delay if the command is made at t=0 (or other times less than the close delay). No warning will be issued for this loss of precision during compilation. Args: t (float): Time, in seconds, when shutter should be closed. """ t_calc = t-self.close_delay if t >= self.close_delay else 0 self.actual_times[t] = {"time": t_calc, "instruction": 0} self.disable(t_calc)
[docs] def generate_code(self, hdf5_file): classname = self.__class__.__name__ calibration_table_dtypes = [ ("name", "a256"), ("open_delay", float), ("close_delay", float) ] if classname not in hdf5_file["calibrations"]: hdf5_file["calibrations"].create_dataset( classname, (0,), dtype=calibration_table_dtypes, maxshape=(None,) ) metadata = (self.name, self.open_delay, self.close_delay) dataset = hdf5_file["calibrations"][classname] dataset.resize((len(dataset) + 1,)) dataset[len(dataset) - 1] = metadata
[docs] def get_change_times(self, *args, **kwargs): retval = DigitalOut.get_change_times(self, *args, **kwargs) if len(self.actual_times) > 1: sorted_times = list(self.actual_times.keys()) sorted_times.sort() for i in range(len(sorted_times) - 1): time = sorted_times[i] next_time = sorted_times[i + 1] instruction = self.actual_times[time]["instruction"] next_instruction = self.actual_times[next_time]["instruction"] state = "opened" if instruction == 1 else "closed" next_state = "open" if next_instruction == 1 else "close" # only look at instructions that contain a state change if instruction != next_instruction: if self.actual_times[next_time]["time"] < self.actual_times[time]["time"]: sys.stderr.write( f"WARNING: The shutter '{self.name}' is requested to " f"{next_state} too early (taking delay into account) at " f"t={next_time:.10f}s when it is still not {state} from " f"an earlier instruction at t={time:.10f}s\n" ) elif not compiler.suppress_mild_warnings and not compiler.suppress_all_warnings: sys.stderr.write( f"WARNING: The shutter '{self.name}' is requested to " f"{next_state} at t={next_time:.10f}s but was never {state} " f"after an earlier instruction at t={time:.10f}s\n" ) return retval
[docs] class Trigger(DigitalOut): """Customized version of :obj:`DigitalOut` that tracks edge type. """ description = "trigger device" allowed_children = [TriggerableDevice]
[docs] @set_passed_properties(property_names={}) def __init__( self, name, parent_device, connection, trigger_edge_type="rising", **kwargs ): """Instantiates a DigitalOut object that tracks the trigger edge type. Args: name (str): python variable name to assign the quantity to. parent_device (:obj:`IntermediateDevice`): Device this quantity is attached to. trigger_edge_type (str, optional): Allowed values are `'rising'` and `'falling'`. **kwargs: Passed to :func:`Output.__init__`. """ DigitalOut.__init__(self, name, parent_device, connection, **kwargs) self.trigger_edge_type = trigger_edge_type if self.trigger_edge_type == "rising": self.enable = self.go_high self.disable = self.go_low self.allowed_states = {1: "enabled", 0: "disabled"} elif self.trigger_edge_type == "falling": self.enable = self.go_low self.disable = self.go_high self.allowed_states = {1: "disabled", 0: "enabled"} else: raise ValueError( "trigger_edge_type must be 'rising' or 'falling', not " f"'{trigger_edge_type}'." ) # A list of the times this trigger has been asked to trigger: self.triggerings = []
[docs] def trigger(self, t, duration): """Command a trigger pulse. Args: t (float): Time, in seconds, for the trigger edge to occur. duration (float): Duration of the trigger, in seconds. """ assert duration > 0, "Negative or zero trigger duration given" if t != self.t0 and self.t0 not in self.instructions: self.disable(self.t0) start = t end = t + duration for other_start, other_duration in self.triggerings: other_end = other_start + other_duration # Check for overlapping exposures: if not (end < other_start or start > other_end): raise LabscriptError( f"{self.description} {self.name} has two overlapping triggerings: " f"one at t = {start}s for {duration}s, and another at " f"t = {other_start}s for {other_duration}s." ) self.enable(t) self.disable(round(t + duration, 10)) self.triggerings.append((t, duration))
[docs] def add_device(self, device): if device.connection != "trigger": raise LabscriptError( f"The 'connection' string of device {device.name} " f"to {self.name} must be 'trigger', not '{device.connection}'" ) DigitalOut.add_device(self, device)
[docs] class DDSQuantity(Device): """Used to define a DDS output. It is a container class, with properties that allow access to a frequency, amplitude, and phase of the output as :obj:`AnalogQuantity`. It can also have a gate, which provides enable/disable control of the output as :obj:`DigitalOut`. This class instantiates channels for frequency/amplitude/phase (and optionally the gate) itself. """ description = 'DDS' allowed_children = [AnalogQuantity, DigitalOut, DigitalQuantity]
[docs] @set_passed_properties(property_names={}) def __init__( self, name, parent_device, connection, digital_gate=None, freq_limits=None, freq_conv_class=None, freq_conv_params=None, amp_limits=None, amp_conv_class=None, amp_conv_params=None, phase_limits=None, phase_conv_class=None, phase_conv_params=None, call_parents_add_device=True, **kwargs ): """Instantiates a DDS quantity. Args: name (str): python variable for the object created. parent_device (:obj:`IntermediateDevice`): Device this output is connected to. connection (str): Output of parent device this DDS is connected to. digital_gate (dict, optional): Configures a digital output to use as an enable/disable gate for the output. Should contain keys `'device'` and `'connection'` with arguments for the `parent_device` and `connection` for instantiating the :obj:`DigitalOut`. All other (optional) keys are passed as kwargs. freq_limits (tuple, optional): `(lower, upper)` limits for the frequency of the output freq_conv_class (:obj:`labscript_utils:labscript_utils.unitconversions`, optional): Unit conversion class for the frequency of the output. freq_conv_params (dict, optional): Keyword arguments passed to the unit conversion class for the frequency of the output. amp_limits (tuple, optional): `(lower, upper)` limits for the amplitude of the output amp_conv_class (:obj:`labscript_utils:labscript_utils.unitconversions`, optional): Unit conversion class for the amplitude of the output. amp_conv_params (dict, optional): Keyword arguments passed to the unit conversion class for the amplitude of the output. phase_limits (tuple, optional): `(lower, upper)` limits for the phase of the output phase_conv_class (:obj:`labscript_utils:labscript_utils.unitconversions`, optional): Unit conversion class for the phase of the output. phase_conv_params (dict, optional): Keyword arguments passed to the unit conversion class for the phase of the output. call_parents_add_device (bool, optional): Have the parent device run its `add_device` method. **kwargs: Keyword arguments passed to :func:`Device.__init__`. """ # Here we set call_parents_add_device=False so that we # can do additional initialisation before manually calling # self.parent_device.add_device(self). This allows the parent's # add_device method to perform checks based on the code below, # whilst still providing us with the checks and attributes that # Device.__init__ gives us in the meantime. Device.__init__( self, name, parent_device, connection, call_parents_add_device=False, **kwargs ) # Ask the parent device if it has default unit conversion classes it would like # us to use: if hasattr(parent_device, 'get_default_unit_conversion_classes'): classes = self.parent_device.get_default_unit_conversion_classes(self) default_freq_conv, default_amp_conv, default_phase_conv = classes # If the user has not overridden, use these defaults. If # the parent does not have a default for one or more of amp, # freq or phase, it should return None for them. if freq_conv_class is None: freq_conv_class = default_freq_conv if amp_conv_class is None: amp_conv_class = default_amp_conv if phase_conv_class is None: phase_conv_class = default_phase_conv self.frequency = AnalogQuantity( f"{self.name}_freq", self, "freq", freq_limits, freq_conv_class, freq_conv_params, ) self.amplitude = AnalogQuantity( f"{self.name}_amp", self, "amp", amp_limits, amp_conv_class, amp_conv_params, ) self.phase = AnalogQuantity( f"{self.name}_phase", self, "phase", phase_limits, phase_conv_class, phase_conv_params, ) self.gate = None digital_gate = digital_gate or {} if "device" in digital_gate and "connection" in digital_gate: dev = digital_gate.pop("device") conn = digital_gate.pop("connection") self.gate = DigitalOut(f"{name}_gate", dev, conn, **digital_gate) # Did they only put one key in the dictionary, or use the wrong keywords? elif len(digital_gate) > 0: raise LabscriptError( 'You must specify the "device" and "connection" for the digital gate ' f"of {self.name}." ) # If the user has not specified a gate, and the parent device # supports gating of DDS output, it should add a gate to this # instance in its add_device method, which is called below. If # they *have* specified a gate device, but the parent device # has its own gating (such as the PulseBlaster), it should # check this and throw an error in its add_device method. See # labscript_devices.PulseBlaster.PulseBlaster.add_device for an # example of this. # In some subclasses we need to hold off on calling the parent # device's add_device function until further code has run, # e.g., see PulseBlasterDDS in PulseBlaster.py if call_parents_add_device: self.parent_device.add_device(self)
[docs] def setamp(self, t, value, units=None): """Set the amplitude of the output. Args: t (float): Time, in seconds, when the amplitude is set. value (float): Amplitude to set to. units: Units that the value is defined in. """ self.amplitude.constant(t, value, units)
[docs] def setfreq(self, t, value, units=None): """Set the frequency of the output. Args: t (float): Time, in seconds, when the frequency is set. value (float): Frequency to set to. units: Units that the value is defined in. """ self.frequency.constant(t, value, units)
[docs] def setphase(self, t, value, units=None): """Set the phase of the output. Args: t (float): Time, in seconds, when the phase is set. value (float): Phase to set to. units: Units that the value is defined in. """ self.phase.constant(t, value, units)
[docs] def enable(self, t): """Enable the Output. Args: t (float): Time, in seconds, to enable the output at. Raises: LabscriptError: If the DDS is not instantiated with a digital gate. """ if self.gate is None: raise LabscriptError( f"DDS {self.name} does not have a digital gate, so you cannot use the " "enable(t) method." ) self.gate.go_high(t)
[docs] def disable(self, t): """Disable the Output. Args: t (float): Time, in seconds, to disable the output at. Raises: LabscriptError: If the DDS is not instantiated with a digital gate. """ if self.gate is None: raise LabscriptError( f"DDS {self.name} does not have a digital gate, so you cannot use the " "disable(t) method." ) self.gate.go_low(t)
[docs] def pulse( self, t, duration, amplitude, frequency, phase=None, amplitude_units=None, frequency_units=None, phase_units=None, print_summary=False, ): """Pulse the output. Args: t (float): Time, in seconds, to start the pulse at. duration (float): Length of the pulse, in seconds. amplitude (float): Amplitude to set the output to during the pulse. frequency (float): Frequency to set the output to during the pulse. phase (float, optional): Phase to set the output to during the pulse. amplitude_units: Units of `amplitude`. frequency_units: Units of `frequency`. phase_units: Units of `phase`. print_summary (bool, optional): Print a summary of the pulse during compilation time. Returns: float: Duration of the pulse, in seconds. """ if print_summary: functions.print_time( t, f"{self.name} pulse at {frequency/MHz:.4f} MHz for {duration/ms:.3f} ms", ) self.setamp(t, amplitude, amplitude_units) if frequency is not None: self.setfreq(t, frequency, frequency_units) if phase is not None: self.setphase(t, phase, phase_units) if amplitude != 0 and self.gate is not None: self.enable(t) self.disable(t + duration) self.setamp(t + duration, 0) return duration
[docs] class DDS(DDSQuantity): """DDS class for use with all devices that have DDS-like outputs."""
[docs] class StaticDDS(Device): """Static DDS class for use with all devices that have static DDS-like outputs.""" description = "Static RF" allowed_children = [StaticAnalogQuantity,DigitalOut,StaticDigitalOut]
[docs] @set_passed_properties(property_names = {}) def __init__( self, name, parent_device, connection, digital_gate=None, freq_limits=None, freq_conv_class=None, freq_conv_params=None, amp_limits=None, amp_conv_class=None, amp_conv_params=None, phase_limits=None, phase_conv_class=None, phase_conv_params=None, **kwargs, ): """Instantiates a Static DDS quantity. Args: name (str): python variable for the object created. parent_device (:obj:`IntermediateDevice`): Device this output is connected to. connection (str): Output of parent device this DDS is connected to. digital_gate (dict, optional): Configures a digital output to use as an enable/disable gate for the output. Should contain keys `'device'` and `'connection'` with arguments for the `parent_device` and `connection` for instantiating the :obj:`DigitalOut`. All other (optional) keys are passed as kwargs. freq_limits (tuple, optional): `(lower, upper)` limits for the frequency of the output freq_conv_class (:obj:`labscript_utils:labscript_utils.unitconversions`, optional): Unit conversion class for the frequency of the output. freq_conv_params (dict, optional): Keyword arguments passed to the unit conversion class for the frequency of the output. amp_limits (tuple, optional): `(lower, upper)` limits for the amplitude of the output amp_conv_class (:obj:`labscript_utils:labscript_utils.unitconversions`, optional): Unit conversion class for the amplitude of the output. amp_conv_params (dict, optional): Keyword arguments passed to the unit conversion class for the amplitude of the output. phase_limits (tuple, optional): `(lower, upper)` limits for the phase of the output phase_conv_class (:obj:`labscript_utils:labscript_utils.unitconversions`, optional): Unit conversion class for the phase of the output. phase_conv_params (dict, optional): Keyword arguments passed to the unit conversion class for the phase of the output. call_parents_add_device (bool, optional): Have the parent device run its `add_device` method. **kwargs: Keyword arguments passed to :func:`Device.__init__`. """ # We tell Device.__init__ to not call # self.parent.add_device(self), we'll do that ourselves later # after further intitialisation, so that the parent can see the # freq/amp/phase objects and manipulate or check them from within # its add_device method. Device.__init__( self, name, parent_device, connection, call_parents_add_device=False, **kwargs ) # Ask the parent device if it has default unit conversion classes it would like us to use: if hasattr(parent_device, 'get_default_unit_conversion_classes'): classes = parent_device.get_default_unit_conversion_classes(self) default_freq_conv, default_amp_conv, default_phase_conv = classes # If the user has not overridden, use these defaults. If # the parent does not have a default for one or more of amp, # freq or phase, it should return None for them. if freq_conv_class is None: freq_conv_class = default_freq_conv if amp_conv_class is None: amp_conv_class = default_amp_conv if phase_conv_class is None: phase_conv_class = default_phase_conv self.frequency = StaticAnalogQuantity( f"{self.name}_freq", self, "freq", freq_limits, freq_conv_class, freq_conv_params ) self.amplitude = StaticAnalogQuantity( f"{self.name}_amp", self, "amp", amp_limits, amp_conv_class, amp_conv_params, ) self.phase = StaticAnalogQuantity( f"{self.name}_phase", self, "phase", phase_limits, phase_conv_class, phase_conv_params, ) digital_gate = digital_gate or {} if "device" in digital_gate and "connection" in digital_gate: dev = digital_gate.pop("device") conn = digital_gate.pop("connection") self.gate = DigitalOut(f"{name}_gate", dev, conn, **digital_gate) # Did they only put one key in the dictionary, or use the wrong keywords? elif len(digital_gate) > 0: raise LabscriptError( 'You must specify the "device" and "connection" for the digital gate ' f"of {self.name}" ) # Now we call the parent's add_device method. This is a must, since we didn't do so earlier from Device.__init__. self.parent_device.add_device(self)
[docs] def setamp(self, value, units=None): """Set the static amplitude of the output. Args: value (float): Amplitude to set to. units: Units that the value is defined in. """ self.amplitude.constant(value,units)
[docs] def setfreq(self, value, units=None): """Set the static frequency of the output. Args: value (float): Frequency to set to. units: Units that the value is defined in. """ self.frequency.constant(value,units)
[docs] def setphase(self, value, units=None): """Set the static phase of the output. Args: value (float): Phase to set to. units: Units that the value is defined in. """ self.phase.constant(value,units)
[docs] def enable(self, t=None): """Enable the Output. Args: t (float, optional): Time, in seconds, to enable the output at. Raises: LabscriptError: If the DDS is not instantiated with a digital gate. """ if self.gate: self.gate.go_high(t) else: raise LabscriptError( f"DDS {self.name} does not have a digital gate, so you cannot use the " "enable(t) method." )
[docs] def disable(self, t=None): """Disable the Output. Args: t (float, optional): Time, in seconds, to disable the output at. Raises: LabscriptError: If the DDS is not instantiated with a digital gate. """ if self.gate: self.gate.go_low(t) else: raise LabscriptError( f"DDS {self.name} does not have a digital gate, so you cannot use the " "disable(t) method." )