Source code for labscript_devices.TekScope.blacs_workers

import numpy as np
import labscript_utils.h5_lock
import h5py

from blacs.tab_base_classes import Worker
import labscript_utils.properties


[docs] class TekScopeWorker(Worker):
[docs] def init(self): global TekScope from .TekScope import TekScope self.scope = TekScope(self.addr, termination=self.termination) manufacturer, model, sn, revision = self.scope.idn.split(',') assert manufacturer.lower() == 'tektronix' "Device is made by {:s}, not by Tektronix, and is actually a {:s}".format( manufacturer, model ) print('Connected to {} (SN: {})'.format(model, sn))
[docs] def transition_to_buffered(self, device_name, h5file, front_panel_values, refresh): self.h5file = h5file # We'll need this in transition_to_manual self.device_name = device_name with h5py.File(h5file, 'r') as hdf5_file: print('\n' + h5file) self.scope_params = labscript_utils.properties.get( hdf5_file, device_name, 'device_properties' ) self.scope.dev.timeout = 1000 * self.scope_params.get('timeout', 5) self.scope.unlock() self.scope.set_acquire_state(True) # TODO: Make per-shot acquisition parameters and channels configurable here self.scope.write('ACQUIRE:MODE SAMPLE') self.scope.write('ACQUIRE:STOPAFTER SEQUENCE') self.scope.write('ACQUIRE:STATE RUN') return {}
[docs] def transition_to_manual(self): channels = self.scope.channels() wfmp = {} vals = {} wtype = [('t', 'float')] print('Downloading...') for ch, enabled in channels.items(): if enabled: wfmp[ch], t, vals[ch] = self.scope.waveform( ch, int16=self.scope_params.get('int16', False), preamble_string=self.preamble_string, ) wtype.append((ch, 'float')) print(wfmp[ch]['WFID']) # Collate all data in a structured array data = np.empty(len(t), dtype=wtype) data['t'] = t for ch in vals: data[ch] = vals[ch] # Open the file after download so as not to hog the file lock with h5py.File(self.h5file, 'r+') as hdf_file: grp = hdf_file.create_group('/data/traces') print('Saving traces...') dset = grp.create_dataset(self.device_name, data=data) dset.attrs.update(wfmp[ch]) print('Done!') return True
[docs] def program_manual(self, values): return values
[docs] def abort(self): print('aborting!') # self.scope.write('*RST') return True
[docs] def abort_buffered(self): print('abort_buffered: ...') return self.abort()
[docs] def abort_transition_to_buffered(self): print('abort_transition_to_buffered: ...') return self.abort()