Source code for labscript_devices.IMAQdxCamera.blacs_tabs

#                                                                   #
# /labscript_devices/IMAQdxCamera/                     #
#                                                                   #
# Copyright 2019, Monash University and contributors                #
#                                                                   #
# This file is part of labscript_devices, in the labscript suite    #
# (see, and is licensed under the        #
# Simplified BSD License. See the license.txt file in the root of   #
# the project for the full license.                                 #
#                                                                   #

import os
import json
from time import perf_counter
import ast
from queue import Empty

import labscript_utils.h5_lock
import h5py

import numpy as np

from qtutils import UiLoader, inmain_decorator
import qtutils.icons
from qtutils.qt import QtWidgets, QtGui, QtCore
import pyqtgraph as pg

from blacs.tab_base_classes import define_state, MODE_MANUAL
from blacs.device_base_class import DeviceTab

from labscript_utils.ls_zprocess import ZMQServer

[docs] def exp_av(av_old, data_new, dt, tau): """Compute the new value of an exponential moving average based on the previous average av_old, a new value data_new, a time interval dt and an averaging timescale tau. Returns data_new if dt > tau""" if dt > tau: return data_new k = dt / tau return k * data_new + (1 - k) * av_old
[docs] class ImageReceiver(ZMQServer): """ZMQServer that receives images on a zmq.REP socket, replies 'ok', and updates the image widget and fps indicator""" def __init__(self, image_view, label_fps): ZMQServer.__init__(self, port=None, dtype='multipart') self.image_view = image_view self.label_fps = label_fps self.last_frame_time = None self.frame_rate = None self.update_event = None
[docs] @inmain_decorator(wait_for_return=True) def handler(self, data): # Acknowledge immediately so that the worker process can begin acquiring the # next frame. This increases the possible frame rate since we may render a frame # whilst acquiring the next, but does not allow us to accumulate a backlog since # only one call to this method may occur at a time. self.send([b'ok']) md = json.loads(data[0]) image = np.frombuffer(memoryview(data[1]), dtype=md['dtype']) image = image.reshape(md['shape']) if len(image.shape) == 3 and image.shape[0] == 1: # If only one image given as a 3D array, convert to 2D array: image = image.reshape(image.shape[1:]) this_frame_time = perf_counter() if self.last_frame_time is not None: dt = this_frame_time - self.last_frame_time if self.frame_rate is not None: # Exponential moving average of the frame rate over 1 second: self.frame_rate = exp_av(self.frame_rate, 1 / dt, dt, 1.0) else: self.frame_rate = 1 / dt self.last_frame_time = this_frame_time if self.image_view.image is None: # First time setting an image. Do autoscaling etc: self.image_view.setImage(image.swapaxes(-1, -2)) else: # Updating image. Keep zoom/pan/levels/etc settings. self.image_view.setImage( image.swapaxes(-1, -2), autoRange=False, autoLevels=False ) # Update fps indicator: if self.frame_rate is not None: self.label_fps.setText(f"{self.frame_rate:.01f} fps") # Tell Qt to send posted events immediately to prevent a backlog of paint events # and other low-priority events. It seems that we cannot make our qtutils # CallEvents (which are used to call this method in the main thread) low enough # priority to ensure all other occur before our next call to self.handler() # runs. This may be because the CallEvents used by qtutils.invoke_in_main have # their own event handler (qtutils.invoke_in_main.Caller), perhaps posted event # priorities are only meaningful within the context of a single event handler, # and not for the Qt event loop as a whole. In any case, this seems to fix it. # Manually calling this is usually a sign of bad coding, but I think it is the # right solution to this problem. This solves issue #36. QtWidgets.QApplication.instance().sendPostedEvents() return self.NO_RESPONSE
[docs] class IMAQdxCameraTab(DeviceTab): # Subclasses may override this if all they do is replace the worker class with a # different one: worker_class = 'labscript_devices.IMAQdxCamera.blacs_workers.IMAQdxCameraWorker' # Subclasses may override this to False if camera attributes should be set every # shot even if the same values have previously been set: use_smart_programming = True
[docs] def initialise_GUI(self): layout = self.get_tab_layout() ui_filepath = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'blacs_tab.ui' ) attributes_ui_filepath = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'attributes_dialog.ui' ) self.ui = UiLoader().load(ui_filepath) self.ui.pushButton_continuous.clicked.connect(self.on_continuous_clicked) self.ui.pushButton_stop.clicked.connect(self.on_stop_clicked) self.ui.pushButton_snap.clicked.connect(self.on_snap_clicked) self.ui.pushButton_attributes.clicked.connect(self.on_attributes_clicked) self.ui.toolButton_nomax.clicked.connect(self.on_reset_rate_clicked) self.attributes_dialog = UiLoader().load(attributes_ui_filepath) self.attributes_dialog.setParent(self.ui.parent()) self.attributes_dialog.setWindowFlags(QtCore.Qt.Tool) self.attributes_dialog.setWindowTitle("{} attributes".format(self.device_name)) self.attributes_dialog.pushButton_copy.clicked.connect(self.on_copy_clicked) self.attributes_dialog.comboBox.currentIndexChanged.connect( self.on_attr_visibility_level_changed ) self.ui.doubleSpinBox_maxrate.valueChanged.connect(self.on_max_rate_changed) layout.addWidget(self.ui) self.image = pg.ImageView() self.image.setSizePolicy( QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Expanding ) self.ui.horizontalLayout.addWidget(self.image) self.ui.pushButton_stop.hide() self.ui.doubleSpinBox_maxrate.hide() self.ui.toolButton_nomax.hide() self.ui.label_fps.hide() # Ensure the GUI reserves space for these widgets even if they are hidden. # This prevents the GUI jumping around when buttons are clicked: for widget in [ self.ui.pushButton_stop, self.ui.doubleSpinBox_maxrate, self.ui.toolButton_nomax, ]: size_policy = widget.sizePolicy() if hasattr(size_policy, 'setRetainSizeWhenHidden'): # Qt 5.2+ only size_policy.setRetainSizeWhenHidden(True) widget.setSizePolicy(size_policy) # Start the image receiver ZMQ server: self.image_receiver = ImageReceiver(self.image, self.ui.label_fps) self.acquiring = False self.supports_smart_programming(self.use_smart_programming)
[docs] def get_save_data(self): return { 'attribute_visibility': self.attributes_dialog.comboBox.currentText(), 'acquiring': self.acquiring, 'max_rate': self.ui.doubleSpinBox_maxrate.value(), 'colormap': repr(self.image.ui.histogram.gradient.saveState()) }
[docs] def restore_save_data(self, save_data): self.attributes_dialog.comboBox.setCurrentText( save_data.get('attribute_visibility', 'simple') ) self.ui.doubleSpinBox_maxrate.setValue(save_data.get('max_rate', 0)) if save_data.get('acquiring', False): # Begin acquisition self.on_continuous_clicked(None) if 'colormap' in save_data: self.image.ui.histogram.gradient.restoreState( ast.literal_eval(save_data['colormap']) )
[docs] def initialise_workers(self): table = self.settings['connection_table'] connection_table_properties = table.find_by_name(self.device_name).properties # The device properties can vary on a shot-by-shot basis, but at startup we will # initially set the values that are configured in the connection table, so they # can be used for manual mode acquisition: with h5py.File(table.filepath, 'r') as f: device_properties = f, self.device_name, "device_properties" ) worker_initialisation_kwargs = { 'serial_number': connection_table_properties['serial_number'], 'orientation': connection_table_properties['orientation'], 'camera_attributes': device_properties['camera_attributes'], 'manual_mode_camera_attributes': connection_table_properties[ 'manual_mode_camera_attributes' ], 'mock': connection_table_properties['mock'], 'image_receiver_port': self.image_receiver.port, } self.create_worker( 'main_worker', self.worker_class, worker_initialisation_kwargs ) self.primary_worker = "main_worker"
@define_state(MODE_MANUAL, queue_state_indefinitely=True, delete_stale_states=True) def update_attributes(self): attributes_text = yield ( self.queue_work( self.primary_worker, 'get_attributes_as_text', self.attributes_dialog.comboBox.currentText(), ) ) self.attributes_dialog.plainTextEdit.setPlainText(attributes_text)
[docs] def on_attributes_clicked(self, button): self.on_attr_visibility_level_changed(None)
[docs] def on_attr_visibility_level_changed(self, value): self.attributes_dialog.plainTextEdit.setPlainText("Reading attributes...") self.update_attributes()
[docs] def on_continuous_clicked(self, button): self.ui.pushButton_snap.setEnabled(False) self.ui.pushButton_attributes.setEnabled(False) self.ui.pushButton_continuous.hide() self.ui.label_fps.setText('? fps') self.acquiring = True max_fps = self.ui.doubleSpinBox_maxrate.value() dt = 1 / max_fps if max_fps else 0 self.start_continuous(dt)
[docs] def on_stop_clicked(self, button): self.ui.pushButton_snap.setEnabled(True) self.ui.pushButton_attributes.setEnabled(True) self.ui.doubleSpinBox_maxrate.hide() self.ui.toolButton_nomax.hide() self.ui.pushButton_stop.hide() self.ui.label_fps.hide() self.acquiring = False self.stop_continuous()
[docs] def on_copy_clicked(self, button): text = self.attributes_dialog.plainTextEdit.toPlainText() clipboard = QtGui.QApplication.instance().clipboard() clipboard.setText(text)
[docs] def on_reset_rate_clicked(self): self.ui.doubleSpinBox_maxrate.setValue(0)
[docs] def on_max_rate_changed(self, max_fps): if self.acquiring: self.stop_continuous() dt = 1 / max_fps if max_fps else 0 self.start_continuous(dt)
@define_state(MODE_MANUAL, queue_state_indefinitely=True, delete_stale_states=True) def on_snap_clicked(self, button): yield (self.queue_work(self.primary_worker, 'snap')) @define_state(MODE_MANUAL, queue_state_indefinitely=True, delete_stale_states=True) def start_continuous(self, dt): yield (self.queue_work(self.primary_worker, 'start_continuous', dt)) @define_state(MODE_MANUAL, queue_state_indefinitely=True, delete_stale_states=True) def stop_continuous(self): yield (self.queue_work(self.primary_worker, 'stop_continuous'))
[docs] def restart(self, *args, **kwargs): # Must manually stop the receiving server upon tab restart, otherwise it does # not get cleaned up: self.image_receiver.shutdown() return DeviceTab.restart(self, *args, **kwargs)