#####################################################################
# #
# /labscript.py #
# #
# Copyright 2013, Monash University #
# #
# This file is part of the program labscript, in the labscript #
# suite (see http://labscriptsuite.org), and is licensed under the #
# Simplified BSD License. See the license.txt file in the root of #
# the project for the full license. #
# #
#####################################################################
"""Everything else including the `start()`, `stop()`, and `wait()` functions - all other
classes are also imported here for backwards compatibility"""
import builtins
import os
import sys
import subprocess
import keyword
import threading
from functools import lru_cache
import numpy as np
# Notes for v3
#
# Anything commented with TO_DELETE:runmanager-batchompiler-agnostic
# can be removed with a major version bump of labscript (aka v3+)
# We leave it in now to maintain backwards compatibility between new labscript
# and old runmanager.
# The code to be removed relates to the move of the globals loading code from
# labscript to runmanager batch compiler.
import labscript_utils.h5_lock, h5py
import labscript_utils.properties
from labscript_utils.filewatcher import FileWatcher
# This imports the default Qt library that other labscript suite code will
# import as well, since it all uses qtutils. By having a Qt library already
# imported, we ensure matplotlib (imported by pylab) will notice this and use
# the same Qt library and API version, and therefore not conflict with any
# other code is using:
import qtutils
from pylab import *
from labscript_utils import dedent
from labscript_utils.properties import set_attributes
import labscript.functions as functions
try:
from labscript_utils.unitconversions import *
except ImportError:
sys.stderr.write('Warning: Failed to import unit conversion classes\n')
# Imports to maintain backwards compatibility with previous module interface
from .base import Device
from .compiler import compiler
from .constants import *
from .core import (
ClockLine,
IntermediateDevice,
Pseudoclock,
PseudoclockDevice,
TriggerableDevice,
)
from .inputs import AnalogIn
from .outputs import (
Output,
AnalogOut,
AnalogQuantity,
DDS,
DDSQuantity,
DigitalOut,
DigitalQuantity,
Shutter,
StaticDDS,
StaticAnalogOut,
StaticAnalogQuantity,
StaticDigitalOut,
StaticDigitalQuantity,
Trigger,
)
from .utils import (
LabscriptError,
bitfield,
fastflatten,
max_or_zero,
set_passed_properties,
suppress_all_warnings,
suppress_mild_warnings
)
# Create a reference to the builtins dict
# update this if accessing builtins ever changes
_builtins_dict = builtins.__dict__
# Startupinfo, for ensuring subprocesses don't launch with a visible command window:
if os.name=='nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= 1 #subprocess.STARTF_USESHOWWINDOW # This variable isn't defined, but apparently it's equal to one.
else:
startupinfo = None
# Alias for backwards compatibility. The config has been moved into the compiler with
# the other settings.
config = compiler
no_warnings = suppress_all_warnings # Historical alias
[docs]
class WaitMonitor(Trigger):
[docs]
@set_passed_properties(property_names={})
def __init__(
self,
name,
parent_device,
connection,
acquisition_device,
acquisition_connection,
timeout_device=None,
timeout_connection=None,
timeout_trigger_type='rising',
**kwargs
):
"""Create a wait monitor.
This is a device or devices, one of which:
a) outputs pulses every time the master pseudoclock begins running (either at
the start of the shot or after a wait
b) measures the time in between those pulses in order to determine how long the
experiment was paused for during waits
c) optionally, produces pulses in software time that can be used to retrigger
the master pseudoclock if a wait lasts longer than its specified timeout
Args:
parent_device (Device)
The device with buffered digital outputs that should be used to produce
the wait monitor pulses. This device must be one which is clocked by
the master pseudoclock.
connection (str)
The name of the output connection of parent_device that should be used
to produce the pulses.
acquisition_device (Device)
The device which is to receive those pulses as input, and that will
measure how long between them. This does not need to be the same device
as the wait monitor output device (corresponding to `parent_device` and
`connection`). At time of writing, the only devices in labscript that
can be a wait monitor acquisition device are NI DAQmx devices that have
counter inputs.
acquisition_connection (str)
The name of the input connection on `acquisition_device` that is to read
the wait monitor pulses. The user must manually connect the output
device (`parent_device`/`connection`) to this input with a cable, in
order that the pulses can be read by the device. On NI DAQmx devices,
the acquisition_connection should be the name of the counter to be used
such as 'Ctr0'. The physical connection should be made to the input
terminal corresponding to the gate of that counter.
timeout_device (Device, optional)
The device that should be used to produce pulses in software time if a
wait lasts longer than its prescribed timeout. These pulses can
connected to the trigger input of the master pseudoclock, via a digital
logic to 'AND' it with other trigger sources, in order to resume the
master pseudoclock upon a wait timing out. To produce these pulses
during a shot requires cooperation between the acquisition device and
the timeout device code, and at present this means only NI DAQmx devices
can be used as the timeout device (though it need not be the same device
as the acquisition device). If not set, timeout pulses will not be
produced and the user must manually resume the master pseudoclock via
other means, or abort a shot if the indended resumption mechanism fails.
timeout_connection (str, optional)
Which connection on the timeout device should be used to produce timeout
pulses. Since only NI DAQmx devices are supported at the moment, this
must be a digital output on a port on the NI DAQmx device that is not
being used. Most NI DAQmx devices have both buffered and unbuffered
ports, so typically one would use one line of one of the unbuffered
ports for the timeout output.
timeout_trigger_type (str), default `'rising'`
The edge type to be used for the timeout signal, either `'rising'` or
`'falling'`
"""
if compiler.wait_monitor is not None:
raise LabscriptError("Cannot instantiate a second WaitMonitor: there can be only be one in the experiment")
compiler.wait_monitor = self
Trigger.__init__(self, name, parent_device, connection, **kwargs)
if not parent_device.pseudoclock_device.is_master_pseudoclock:
raise LabscriptError('The output device for monitoring wait durations must be clocked by the master pseudoclock device')
if (timeout_device is not None) != (timeout_connection is not None):
raise LabscriptError('Must specify both timeout_device and timeout_connection, or neither')
self.acquisition_device = acquisition_device
self.acquisition_connection = acquisition_connection
self.timeout_device = timeout_device
self.timeout_connection = timeout_connection
self.timeout_trigger_type = timeout_trigger_type
[docs]
def save_time_markers(hdf5_file):
"""Save shot time markers to the shot file.
Args:
hdf5_file (:obj:`h5py:h5py.File`): Handle to file to save to.
"""
time_markers = compiler.time_markers
dtypes = [('label','a256'), ('time', float), ('color', '(1,3)int')]
data_array = zeros(len(time_markers), dtype=dtypes)
for i, t in enumerate(time_markers):
data_array[i] = time_markers[t]["label"], t, time_markers[t]["color"]
time_markers_dataset = hdf5_file.create_dataset('time_markers', data = data_array)
[docs]
def generate_connection_table(hdf5_file):
"""Generates the connection table for the compiled shot.
Args:
hdf5_file (:obj:`h5py:h5py.File`): Handle to file to save to.
"""
connection_table = []
devicedict = {}
# Only use a string dtype as long as is needed:
max_BLACS_conn_length = -1
for device in compiler.inventory:
devicedict[device.name] = device
unit_conversion_parameters = device._properties['unit_conversion_parameters']
serialised_unit_conversion_parameters = labscript_utils.properties.serialise(unit_conversion_parameters)
properties = device._properties["connection_table_properties"]
serialised_properties = labscript_utils.properties.serialise(properties)
# If the device has a BLACS_connection atribute, then check to see if it is longer than the size of the hdf5 column
if hasattr(device,"BLACS_connection"):
# Make sure it is a string!
BLACS_connection = str(device.BLACS_connection)
if len(BLACS_connection) > max_BLACS_conn_length:
max_BLACS_conn_length = len(BLACS_connection)
else:
BLACS_connection = ""
#if there is no BLACS connection, make sure there is no "gui" or "worker" entry in the connection table properties
if 'worker' in properties or 'gui' in properties:
raise LabscriptError('You cannot specify a remote GUI or worker for a device (%s) that does not have a tab in BLACS'%(device.name))
if getattr(device, 'unit_conversion_class', None) is not None:
c = device.unit_conversion_class
unit_conversion_class_repr = f"{c.__module__}.{c.__name__}"
else:
unit_conversion_class_repr = repr(None)
connection_table.append((device.name, device.__class__.__name__,
device.parent_device.name if device.parent_device else str(None),
str(device.connection if device.parent_device else str(None)),
unit_conversion_class_repr,
serialised_unit_conversion_parameters,
BLACS_connection,
serialised_properties))
connection_table.sort()
vlenstring = h5py.special_dtype(vlen=str)
connection_table_dtypes = [('name','a256'), ('class','a256'), ('parent','a256'), ('parent port','a256'),
('unit conversion class','a256'), ('unit conversion params', vlenstring),
('BLACS_connection','a'+str(max_BLACS_conn_length)),
('properties', vlenstring)]
connection_table_array = empty(len(connection_table),dtype=connection_table_dtypes)
for i, row in enumerate(connection_table):
connection_table_array[i] = row
dataset = hdf5_file.create_dataset('connection table', compression=compiler.compression, data=connection_table_array, maxshape=(None,))
if compiler.master_pseudoclock is None:
master_pseudoclock_name = 'None'
else:
master_pseudoclock_name = compiler.master_pseudoclock.name
dataset.attrs['master_pseudoclock'] = master_pseudoclock_name
# Create a dictionary for caching results from vcs commands. The keys will be
# the paths to files that are saved during save_labscripts(). The values will be
# a list of tuples of the form (command, info, err); see the "Returns" section
# of the _run_vcs_commands() docstring for more info. Also create a FileWatcher
# instance for tracking when vcs results need updating. The callback will
# replace the outdated cache entry with a new list of updated vcs commands and
# outputs.
_vcs_cache = {}
_vcs_cache_rlock = threading.RLock()
def _file_watcher_callback(name, info, event):
with _vcs_cache_rlock:
_vcs_cache[name] = _run_vcs_commands(name)
_file_watcher = FileWatcher(_file_watcher_callback)
@lru_cache(None)
def _have_vcs(vcs):
try:
subprocess.check_output([vcs, '--version'])
return True
except FileNotFoundError:
msg = f"""Warning: Cannot run {vcs} commands, {vcs} info for labscriptlib files
will not be saved. You can disable this warning by setting
[labscript]/save_{vcs}_info = False in labconfig."""
sys.stderr.write(dedent(msg) + '\n')
return False
def _run_vcs_commands(path):
"""Run some VCS commands on a file and return their output.
The function is used to gather up version control system information so that
it can be stored in the hdf5 files of shots. This is for convenience and
compliments the full copy of the file already included in the shot file.
Whether hg and git commands are run is controlled by the `save_hg_info`
and `save_git_info` options in the `[labscript]` section of the labconfig.
Args:
path (str): The path with file name and extension of the file on which
the commands will be run. The working directory will be set to the
directory containing the specified file.
Returns:
results (list of (tuple, str, str)): A list of tuples, each
containing information related to one vcs command of the form
(command, info, err). The first entry in that tuple is itself a
tuple of strings which was passed to subprocess.Popen() in order to
run the command. Then info is a string that contains the text
printed to stdout by that command, and err contains the text printed
to stderr by the command.
"""
# Gather together a list of commands to run.
module_directory, module_filename = os.path.split(path)
vcs_commands = []
if compiler.save_hg_info and _have_vcs('hg'):
hg_commands = [
['log', '--limit', '1'],
['status'],
['diff'],
]
for command in hg_commands:
command = tuple(['hg'] + command + [module_filename])
vcs_commands.append((command, module_directory))
if compiler.save_git_info and _have_vcs('git'):
git_commands = [
['branch', '--show-current'],
['describe', '--tags', '--always', 'HEAD'],
['rev-parse', 'HEAD'],
['diff', 'HEAD', module_filename],
]
for command in git_commands:
command = tuple(['git'] + command)
vcs_commands.append((command, module_directory))
# Now go through and start running the commands.
process_list = []
for command, module_directory in vcs_commands:
process = subprocess.Popen(
command,
cwd=module_directory,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
startupinfo=startupinfo,
)
process_list.append((command, process))
# Gather up results from the commands issued.
results = []
for command, process in process_list:
info, err = process.communicate()
info = info.decode('utf-8')
err = err.decode('utf-8')
results.append((command, info, err))
return results
[docs]
def save_labscripts(hdf5_file):
"""Writes the script files for the compiled shot to the shot file.
If `save_hg_info` labconfig parameter is `True`, will attempt to save
hg version info as an attribute.
Args:
hdf5_file (:obj:`h5py:h5py.File`): Handle to file to save to.
"""
if compiler.labscript_file is not None:
script_text = open(compiler.labscript_file).read()
else:
script_text = ''
script = hdf5_file.create_dataset('script',data=script_text)
script.attrs['name'] = os.path.basename(compiler.labscript_file).encode() if compiler.labscript_file is not None else ''
script.attrs['path'] = os.path.dirname(compiler.labscript_file).encode() if compiler.labscript_file is not None else sys.path[0]
try:
import labscriptlib
except ImportError:
return
prefix = os.path.dirname(labscriptlib.__file__)
for module in sys.modules.values():
if getattr(module, '__file__', None) is not None:
path = os.path.abspath(module.__file__)
if path.startswith(prefix) and (path.endswith('.pyc') or path.endswith('.py')):
path = path.replace('.pyc', '.py')
save_path = 'labscriptlib/' + path.replace(prefix, '').replace('\\', '/').replace('//', '/')
if save_path in hdf5_file:
# Don't try to save the same module script twice! (seems to at least
# double count __init__.py when you import an entire module as in
# from labscriptlib.stages import * where stages is a folder with an
# __init__.py file. Doesn't seem to want to double count files if
# you just import the contents of a file within a module
continue
hdf5_file.create_dataset(save_path, data=open(path).read())
with _vcs_cache_rlock:
if not path in _vcs_cache:
# Add file to watch list and create its entry in the cache.
_file_watcher.add_file(path)
_file_watcher_callback(path, None, None)
# Save the cached vcs output to the file.
for command, info, err in _vcs_cache[path]:
attribute_str = command[0] + ' ' + command[1]
hdf5_file[save_path].attrs[attribute_str] = (info + '\n' + err)
[docs]
def write_device_properties(hdf5_file):
"""Writes device_properties for each device in compiled shot to shto file.
Args:
hdf5_file (:obj:`h5py:h5py.File`): Handle to file to save to.
"""
for device in compiler.inventory:
device_properties = device._properties["device_properties"]
# turn start_order and stop_order into device_properties,
# but only if the device has a BLACS_connection attribute. start_order
# or stop_order being None means the default order, zero. An order
# being specified on a device without a BLACS_connection is an error.
for attr_name in ['start_order', 'stop_order']:
attr = getattr(device, attr_name)
if hasattr(device, 'BLACS_connection'):
if attr is None:
# Default:
attr = 0
device_properties[attr_name] = attr
elif attr is not None:
msg = ('cannot set %s on device %s, which does ' % (attr_name, device.name) +
'not have a BLACS_connection attribute and thus is not started and stopped directly by BLACS.')
raise TypeError(msg)
# Special case: We don't create the group if the only property is an
# empty dict called 'added properties'. This is because this property
# is present in all devices, and represents a place to pass in
# arbitrary data from labscript experiment scripts. We don't want a
# group for every device if nothing is actually being passed in, so we
# ignore this case.
if device_properties and device_properties != {'added_properties':{}}:
# Create group if doesn't exist:
if not device.name in hdf5_file['devices']:
hdf5_file['/devices'].create_group(device.name)
labscript_utils.properties.set_device_properties(hdf5_file, device.name, device_properties)
[docs]
def generate_wait_table(hdf5_file):
"""Generates the wait table for the shot and saves it to the shot file.
Args:
hdf5_file (:obj:`h5py:h5py.File`): Handle to file to save to.
"""
dtypes = [('label','a256'), ('time', float), ('timeout', float)]
data_array = zeros(len(compiler.wait_table), dtype=dtypes)
for i, t in enumerate(sorted(compiler.wait_table)):
label, timeout = compiler.wait_table[t]
data_array[i] = label, t, timeout
dataset = hdf5_file.create_dataset('waits', data = data_array)
if compiler.wait_monitor is not None:
acquisition_device = compiler.wait_monitor.acquisition_device.name
acquisition_connection = compiler.wait_monitor.acquisition_connection
if compiler.wait_monitor.timeout_device is None:
timeout_device = ''
else:
timeout_device = compiler.wait_monitor.timeout_device.name
if compiler.wait_monitor.timeout_connection is None:
timeout_connection = ''
else:
timeout_connection = compiler.wait_monitor.timeout_connection
timeout_trigger_type = compiler.wait_monitor.timeout_trigger_type
else:
acquisition_device, acquisition_connection, timeout_device, timeout_connection, timeout_trigger_type = '', '', '', '', ''
dataset.attrs['wait_monitor_acquisition_device'] = acquisition_device
dataset.attrs['wait_monitor_acquisition_connection'] = acquisition_connection
dataset.attrs['wait_monitor_timeout_device'] = timeout_device
dataset.attrs['wait_monitor_timeout_connection'] = timeout_connection
dataset.attrs['wait_monitor_timeout_trigger_type'] = timeout_trigger_type
[docs]
def generate_code():
"""Compiles a shot and saves it to the shot file.
"""
if compiler.hdf5_filename is None:
raise LabscriptError('hdf5 file for compilation not set. Please call labscript_init')
elif not os.path.exists(compiler.hdf5_filename):
with h5py.File(compiler.hdf5_filename ,'w') as hdf5_file:
hdf5_file.create_group('globals')
with h5py.File(compiler.hdf5_filename, 'a') as hdf5_file:
try:
hdf5_file.create_group('devices')
hdf5_file.create_group('calibrations')
except ValueError:
# Group(s) already exist - this is not a fresh h5 file, we cannot compile with it:
raise ValueError('The HDF5 file %s already contains compilation data '%compiler.hdf5_filename +
'(possibly partial if due to failed compilation). ' +
'Please use a fresh shot file. ' +
'If this one was autogenerated by a previous compilation, ' +
'and you wish to have a new one autogenerated, '+
'simply delete it and run again, or add the \'-f\' command line ' +
'argument to automatically overwrite.')
for device in compiler.inventory:
if device.parent_device is None:
device.generate_code(hdf5_file)
save_time_markers(hdf5_file)
generate_connection_table(hdf5_file)
write_device_properties(hdf5_file)
generate_wait_table(hdf5_file)
save_labscripts(hdf5_file)
# Save shot properties:
group = hdf5_file.create_group('shot_properties')
set_attributes(group, compiler.shot_properties)
[docs]
def trigger_all_pseudoclocks(t='initial'):
# Must wait this long before providing a trigger, in case child clocks aren't ready yet:
wait_delay = compiler.wait_delay
if type(t) in [str, bytes] and t == 'initial':
# But not at the start of the experiment:
wait_delay = 0
# Trigger them all:
for pseudoclock in compiler.all_pseudoclocks:
pseudoclock.trigger(t, compiler.trigger_duration)
# How long until all devices can take instructions again? The user
# can command output from devices on the master clock immediately,
# but unless things are time critical, they can wait this long and
# know for sure all devices can receive instructions:
max_delay_time = max_or_zero([pseudoclock.trigger_delay for pseudoclock in compiler.all_pseudoclocks if not pseudoclock.is_master_pseudoclock])
# On the other hand, perhaps the trigger duration and clock limit of the master clock is
# limiting when we can next give devices instructions:
# So find the max of 1.0/clock_limit of every clockline on every pseudoclock of the master pseudoclock
master_pseudoclock_delay = max(1.0/compiler.master_pseudoclock.clock_limit, max_or_zero([1.0/clockline.clock_limit for pseudoclock in compiler.master_pseudoclock.child_devices for clockline in pseudoclock.child_devices]))
max_delay = max(compiler.trigger_duration + master_pseudoclock_delay, max_delay_time)
return max_delay + wait_delay
[docs]
def wait(label, t, timeout=5):
"""Commands pseudoclocks to pause until resumed by an external trigger, or a timeout is reached.
Args:
label (str): Unique name for wait.
t (float): Time, in seconds, at which experiment should begin the wait.
timeout (float, optional): Maximum length of the wait, in seconds. After
this time, the pseudoclocks are automatically re-triggered.
Returns:
float: Time required for all pseudoclocks to resume execution once
wait has completed.
"""
if not str(label):
raise LabscriptError('Wait must have a name')
max_delay = trigger_all_pseudoclocks(t)
if t in compiler.wait_table:
raise LabscriptError('There is already a wait at t=%s'%str(t))
if any([label==existing_label for existing_label, _ in compiler.wait_table.values()]):
raise LabscriptError('There is already a wait named %s'%str(label))
compiler.wait_table[t] = str(label), float(timeout)
return max_delay
[docs]
def add_time_marker(t, label, color=None, verbose=False):
"""Add a marker for the specified time. These markers are saved in the HDF5 file.
This allows one to label that time with a string label, and a color that
applications may use to represent this part of the experiment. The color may be
specified as an RGB tuple, or a string of the color name such as 'red', or a string
of its hex value such as '#ff88g0'. If verbose=True, this funtion also prints the
label and time, which can be useful to view during shot compilation.
Runviewer displays these markers and allows one to manipulate the time axis based on
them, and BLACS' progress bar plugin displays the name and colour of the most recent
marker as the shot is running"""
if isinstance(color, (str, bytes)):
import PIL.ImageColor
color = PIL.ImageColor.getrgb(color)
if color is None:
color = (-1, -1, -1)
elif not all(0 <= n <= 255 for n in color):
raise ValueError("Invalid RGB tuple %s" % str(color))
if verbose:
functions.print_time(t, label)
compiler.time_markers[t] = {"label": label, "color": color}
[docs]
def start():
"""Indicates the end of the connection table and the start of the
experiment logic.
Returns:
float: Time required for all pseudoclocks to start execution.
"""
compiler.start_called = True
# Get and save some timing info about the pseudoclocks:
# TODO: What if you need to trigger individual Pseudolocks on the one device, rather than the PseudoclockDevice as a whole?
pseudoclocks = [device for device in compiler.inventory if isinstance(device, PseudoclockDevice)]
compiler.all_pseudoclocks = pseudoclocks
toplevel_devices = [device for device in compiler.inventory if device.parent_device is None]
master_pseudoclocks = [pseudoclock for pseudoclock in pseudoclocks if pseudoclock.is_master_pseudoclock]
if len(master_pseudoclocks) > 1:
raise LabscriptError('Cannot have more than one master pseudoclock')
if not toplevel_devices:
raise LabscriptError('No toplevel devices and no master pseudoclock found')
elif pseudoclocks:
(master_pseudoclock,) = master_pseudoclocks
compiler.master_pseudoclock = master_pseudoclock
# Which pseudoclock requires the longest pulse in order to trigger it?
compiler.trigger_duration = max_or_zero([pseudoclock.trigger_minimum_duration for pseudoclock in pseudoclocks if not pseudoclock.is_master_pseudoclock])
trigger_clock_limits = [pseudoclock.trigger_device.clock_limit for pseudoclock in pseudoclocks if not pseudoclock.is_master_pseudoclock]
if len(trigger_clock_limits) > 0:
min_clock_limit = min(trigger_clock_limits)
min_clock_limit = min([min_clock_limit, master_pseudoclock.clock_limit])
else:
min_clock_limit = master_pseudoclock.clock_limit
# ensure we don't tick faster than attached devices to the master pseudoclock can handle
clockline_limits = [clockline.clock_limit for pseudoclock in master_pseudoclock.child_devices for clockline in pseudoclock.child_devices]
min_clock_limit = min(min_clock_limit, min(clockline_limits))
# check the minimum trigger duration for the waitmonitor
if compiler.wait_monitor is not None:
wait_monitor_minimum_pulse_width = getattr(
compiler.wait_monitor.acquisition_device,
'wait_monitor_minimum_pulse_width',
0,
)
compiler.trigger_duration = max(
compiler.trigger_duration, wait_monitor_minimum_pulse_width
)
# Provide this, or the minimum possible pulse, whichever is longer:
compiler.trigger_duration = max(2.0/min_clock_limit, compiler.trigger_duration) + 2*master_pseudoclock.clock_resolution
# Must wait this long before providing a trigger, in case child clocks aren't ready yet:
compiler.wait_delay = max_or_zero([pseudoclock.wait_delay for pseudoclock in pseudoclocks if not pseudoclock.is_master_pseudoclock])
# Have the master clock trigger pseudoclocks at t = 0:
max_delay = trigger_all_pseudoclocks()
else:
# No pseudoclocks, only other toplevel devices:
compiler.master_pseudoclock = None
compiler.trigger_duration = 0
compiler.wait_delay = 0
max_delay = 0
return max_delay
[docs]
def stop(t, target_cycle_time=None, cycle_time_delay_after_programming=False):
"""Indicate the end of an experiment at the given time, and initiate compilation of
instructions, saving them to the HDF5 file. Configures some shot options.
Args:
t (float): The end time of the experiment.
target_cycle_time (float, optional): default: `None`
How long, in seconds, after the previous shot was started, should this shot be
started by BLACS. This allows one to run shots at a constant rate even if they
are of different durations. If `None`, BLACS will run the next shot immediately
after the previous shot completes. Otherwise, BLACS will delay starting this
shot until the cycle time has elapsed. This is a request only, and may not be
met if running/programming/saving data from a shot takes long enough that it
cannot be met. This functionality requires the BLACS `cycle_time` plugin to be
enabled in labconfig. Its accuracy is also limited by software timing,
requirements of exact cycle times beyond software timing should be instead done
using hardware triggers to Pseudoclocks.
cycle_time_delay_after_programming (bool, optional): default: `False`
Whether the BLACS cycle_time plugin should insert the required delay for the
target cycle time *after* programming devices, as opposed to before programming
them. This is more precise, but may cause some devices to output their first
instruction for longer than desired, since some devices begin outputting their
first instruction as soon as they are programmed rather than when they receive
their first clock tick. If not set, the *average* cycle time will still be just
as close to as requested (so long as there is adequate time available), however
the time interval between the same part of the experiment from one shot to the
next will not be as precise due to variations in programming time.
"""
# Indicate the end of an experiment and initiate compilation:
if t == 0:
raise LabscriptError('Stop time cannot be t=0. Please make your run a finite duration')
for device in compiler.inventory:
if isinstance(device, PseudoclockDevice):
device.stop_time = t
if target_cycle_time is not None:
# Ensure we have a valid type for this
target_cycle_time = float(target_cycle_time)
compiler.shot_properties['target_cycle_time'] = target_cycle_time
compiler.shot_properties['cycle_time_delay_after_programming'] = cycle_time_delay_after_programming
generate_code()
# TO_DELETE:runmanager-batchompiler-agnostic
# entire function load_globals can be deleted
[docs]
def load_globals(hdf5_filename):
import labscript_utils.shot_utils
params = labscript_utils.shot_utils.get_shot_globals(hdf5_filename)
with h5py.File(hdf5_filename,'r') as hdf5_file:
for name in params.keys():
if name in globals() or name in locals() or name in _builtins_dict:
raise LabscriptError('Error whilst parsing globals from %s. \'%s\''%(hdf5_filename,name) +
' is already a name used by Python, labscript, or Pylab.'+
' Please choose a different variable name to avoid a conflict.')
if name in keyword.kwlist:
raise LabscriptError('Error whilst parsing globals from %s. \'%s\''%(hdf5_filename,name) +
' is a reserved Python keyword.' +
' Please choose a different variable name.')
try:
assert '.' not in name
exec(name + ' = 0')
exec('del ' + name )
except:
raise LabscriptError('ERROR whilst parsing globals from %s. \'%s\''%(hdf5_filename,name) +
'is not a valid Python variable name.' +
' Please choose a different variable name.')
# Workaround for the fact that numpy.bool_ objects dont
# match python's builtin True and False when compared with 'is':
if type(params[name]) == np.bool_:
params[name] = bool(params[name])
# 'None' is stored as an h5py null object reference:
if isinstance(params[name], h5py.Reference) and not params[name]:
params[name] = None
_builtins_dict[name] = params[name]
# TO_DELETE:runmanager-batchompiler-agnostic
# load_globals_values=True
[docs]
def labscript_init(hdf5_filename, labscript_file=None, new=False, overwrite=False, load_globals_values=True):
"""Initialises labscript and prepares for compilation.
Args:
hdf5_filename (str): Path to shot file to compile.
labscript_file: Handle to the labscript file.
new (bool, optional): If `True`, ensure a new shot file is created.
overwrite (bool, optional): If `True`, overwrite existing shot file, if it exists.
load_globals_values (bool, optional): If `True`, load global values
from the existing shot file.
"""
# save the builtins for later restoration in labscript_cleanup
compiler.save_builtins_state()
if new:
# defer file creation until generate_code(), so that filesystem
# is not littered with h5 files when the user merely imports
# labscript. If the file already exists, and overwrite is true, delete it so we get one fresh.
if os.path.exists(hdf5_filename) and overwrite:
os.unlink(hdf5_filename)
elif not os.path.exists(hdf5_filename):
raise LabscriptError('Provided hdf5 filename %s doesn\'t exist.'%hdf5_filename)
# TO_DELETE:runmanager-batchompiler-agnostic
elif load_globals_values:
load_globals(hdf5_filename)
# END_DELETE:runmanager-batchompiler-agnostic
compiler.hdf5_filename = hdf5_filename
if labscript_file is None:
import __main__
labscript_file = __main__.__file__
compiler.labscript_file = os.path.abspath(labscript_file)
[docs]
def labscript_cleanup():
"""restores builtins and the labscript module to its state before
labscript_init() was called"""
compiler.reset()