import numpy as np
import labscript_utils.h5_lock
import h5py
from blacs.tab_base_classes import Worker
import labscript_utils.properties
[docs]
class TekScopeWorker(Worker):
[docs]
def init(self):
global TekScope
from .TekScope import TekScope
self.scope = TekScope(self.addr, termination=self.termination)
manufacturer, model, sn, revision = self.scope.idn.split(',')
assert manufacturer.lower() == 'tektronix'
"Device is made by {:s}, not by Tektronix, and is actually a {:s}".format(
manufacturer, model
)
print('Connected to {} (SN: {})'.format(model, sn))
[docs]
def transition_to_buffered(self, device_name, h5file, front_panel_values, refresh):
self.h5file = h5file # We'll need this in transition_to_manual
self.device_name = device_name
with h5py.File(h5file, 'r') as hdf5_file:
print('\n' + h5file)
self.scope_params = labscript_utils.properties.get(
hdf5_file, device_name, 'device_properties'
)
self.scope.dev.timeout = 1000 * self.scope_params.get('timeout', 5)
self.scope.unlock()
self.scope.set_acquire_state(True)
# TODO: Make per-shot acquisition parameters and channels configurable here
self.scope.write('ACQUIRE:MODE SAMPLE')
self.scope.write('ACQUIRE:STOPAFTER SEQUENCE')
self.scope.write('ACQUIRE:STATE RUN')
return {}
[docs]
def transition_to_manual(self):
channels = self.scope.channels()
wfmp = {}
vals = {}
wtype = [('t', 'float')]
print('Downloading...')
for ch, enabled in channels.items():
if enabled:
wfmp[ch], t, vals[ch] = self.scope.waveform(
ch,
int16=self.scope_params.get('int16', False),
preamble_string=self.preamble_string,
)
wtype.append((ch, 'float'))
print(wfmp[ch]['WFID'])
# Collate all data in a structured array
data = np.empty(len(t), dtype=wtype)
data['t'] = t
for ch in vals:
data[ch] = vals[ch]
# Open the file after download so as not to hog the file lock
with h5py.File(self.h5file, 'r+') as hdf_file:
grp = hdf_file.require_group('/data/traces')
print('Saving traces...')
dset = grp.create_dataset(self.device_name, data=data)
dset.attrs.update(wfmp[ch])
print('Done!')
return True
[docs]
def program_manual(self, values):
return values
[docs]
def abort(self):
print('aborting!')
# self.scope.write('*RST')
return True
[docs]
def abort_buffered(self):
print('abort_buffered: ...')
return self.abort()
[docs]
def abort_transition_to_buffered(self):
print('abort_transition_to_buffered: ...')
return self.abort()