#####################################################################
# #
# __main__.py #
# #
# Copyright 2014, Monash University #
# #
# This file is part of the program runviewer, in the labscript #
# suite (see http://labscriptsuite.org), and is licensed under the #
# Simplified BSD License. See the license.txt file in the root of #
# the project for the full license. #
# #
#####################################################################
import os
import labscript_utils.excepthook
# Associate app windows with OS menu shortcuts:
import desktop_app
desktop_app.set_process_appid('runviewer')
# Splash screen
from runviewer import runviewer_dir
from labscript_utils.splash import Splash
splash = Splash(os.path.join(runviewer_dir, 'runviewer.svg'))
splash.show()
splash.update_text('importing standard library modules')
import sys
import time
import threading
import logging
from queue import Queue
import ast
import pprint
import signal
import concurrent.futures
import traceback
splash.update_text('importing labscript suite modules')
from labscript_utils.setup_logging import setup_logging
logger = setup_logging('runviewer')
labscript_utils.excepthook.set_logger(logger)
splash.update_text('importing h5_lock and h5py')
import labscript_utils.h5_lock
import h5py
# No splash update for Qt - the splash screen already imported it
from qtutils.qt.QtCore import *
from qtutils.qt.QtGui import *
from qtutils.qt.QtWidgets import *
splash.update_text('importing pyqtgraph')
import pyqtgraph as pg
splash.update_text('importing numpy')
import numpy
splash.update_text('importing scipy')
from scipy import interpolate
pg.setConfigOption('background', 'w')
pg.setConfigOption('foreground', 'k')
from qtutils import *
import qtutils.icons
splash.update_text('importing labscript suite modules')
from labscript_utils.connections import ConnectionTable
from labscript_utils import device_registry
from labscript_utils.labconfig import LabConfig, save_appconfig, load_appconfig
from labscript_utils.ls_zprocess import ZMQServer, ProcessTree
process_tree = ProcessTree.instance()
process_tree.zlock_client.set_process_name('runviewer')
from labscript_c_extensions.runviewer.resample import resample as _resample
SHOT_MODEL__COLOUR_INDEX = 0
SHOT_MODEL__SHUTTER_INDEX = 1
SHOT_MODEL__CHECKBOX_INDEX = 2
SHOT_MODEL__PATH_INDEX = 1
CHANNEL_MODEL__CHECKBOX_INDEX = 0
CHANNEL_MODEL__CHANNEL_INDEX = 0
[docs]
def int_to_enum(enum_list, value):
"""stupid hack to work around the fact that PySide screws with the type of a variable when it goes into a model. Enums are converted to ints, which then
can't be interpreted by QColor correctly (for example)
unfortunately Qt doesn't provide a python list structure of enums, so you have to build the list yourself.
"""
for item in enum_list:
if item == value:
return item
return value
[docs]
class ScaleHandler():
def __init__(self, input_times, target_positions, stop_time):
# input_times is a list (may be unsorted) of times which should be scaled evenly with target_length
# an input list of [1,2,4,6] and target_length of 1.0 will result in:
# get_scaled_time(1) -> 1
# get_scaled_time(1.5) -> 1.5
# get_scaled_time(3) -> 2.5
# get_scaled_time(4) -> 3
# get_scaled_time(5) -> 3.5 ...
self.org_stop_time = float(stop_time)
if not all((x >= 0) and (x <= self.org_stop_time) for x in input_times):
raise Exception('shot contains at least one marker before t=0 and/or after the stop time. Non-linear time currently does not support this.')
unscaled_times = sorted(input_times)
scaled_times = sorted(target_positions)
# append values for linear scaling before t=0 and after stop time
unscaled_times = [min(unscaled_times)-1e-9] + unscaled_times + [max(unscaled_times) + 1e-9]
scaled_times = [min(scaled_times)-1e-9] + scaled_times + [max(scaled_times) + 1e-9]
self.get_scaled_time = interpolate.interp1d(unscaled_times, scaled_times, assume_sorted=True, bounds_error=False, fill_value='extrapolate')
self.get_unscaled_time = interpolate.interp1d(scaled_times, unscaled_times, assume_sorted=True, bounds_error=False, fill_value='extrapolate')
self.scaled_stop_time = self.get_scaled_time(self.org_stop_time)
[docs]
class ColourDelegate(QItemDelegate):
def __init__(self, view, *args, **kwargs):
QItemDelegate.__init__(self, *args, **kwargs)
self._view = view
self._colours = [Qt.black, Qt.red, Qt.green, Qt.blue, Qt.cyan, Qt.magenta, Qt.yellow, Qt.gray, Qt.darkRed, Qt.darkGreen, Qt.darkBlue, Qt.darkCyan, Qt.darkMagenta, Qt.darkYellow, Qt.darkGray, Qt.lightGray]
self._current_colour_index = 0
[docs]
def get_next_colour(self):
colour = self._colours[self._current_colour_index]
self._current_colour_index += 1
if self._current_colour_index >= len(self._colours):
self._current_colour_index = 0
return colour
[docs]
def createEditor(self, parent, option, index):
editor = QComboBox(parent)
#colours = QColor.colorNames()
for colour in self._colours:
pixmap = QPixmap(20, 20)
pixmap.fill(colour)
editor.addItem(QIcon(pixmap), '', colour)
editor.activated.connect(lambda index, editor=editor: self._view.commitData(editor))
editor.activated.connect(lambda index, editor=editor: self._view.closeEditor(editor, QAbstractItemDelegate.NoHint))
QTimer.singleShot(10, editor.showPopup)
return editor
[docs]
def setEditorData(self, editor, index):
value = index.model().data(index, Qt.UserRole)
for i in range(editor.count()):
if editor.itemData(i) == value():
editor.setCurrentIndex(i)
break
[docs]
def setModelData(self, editor, model, index):
icon = editor.itemIcon(editor.currentIndex())
colour = editor.itemData(editor.currentIndex())
# Note, all data being written to the model must be read out of the editor PRIOR to calling model.setData()
# This is because a call to model.setData() triggers setEditorData(), which messes up subsequent
# calls to the editor to determine the currently selected item/data
model.setData(index, icon, Qt.DecorationRole)
model.setData(index, lambda clist=self._colours, colour=colour: int_to_enum(clist, colour), Qt.UserRole)
[docs]
def updateEditorGeometry(self, editor, option, index):
editor.setGeometry(option.rect)
[docs]
class RunViewer(object):
def __init__(self, exp_config):
splash.update_text('loading graphical interface')
self.ui = UiLoader().load(os.path.join(runviewer_dir, 'main.ui'))
# setup shot treeview model
self.shot_model = QStandardItemModel()
self.shot_model.setHorizontalHeaderLabels(['colour', 'shutters', 'path'])
self.ui.shot_treeview.setModel(self.shot_model)
self.ui.shot_treeview.resizeColumnToContents(1)
self.shot_model.itemChanged.connect(self.on_shot_selection_changed)
self.shot_colour_delegate = ColourDelegate(self.ui.shot_treeview)
self.ui.shot_treeview.setItemDelegateForColumn(0, self.shot_colour_delegate)
# setup channel treeview model
self.channel_model = QStandardItemModel()
self.channel_model.setHorizontalHeaderLabels(['channel'])
self.ui.channel_treeview.setModel(self.channel_model)
self.channel_model.itemChanged.connect(self.update_plots)
# create a hidden plot widget that all plots can link their x-axis too
hidden_plot = pg.PlotWidget(name='runviewer - time axis link')
hidden_plot.setMinimumHeight(1)
hidden_plot.setMaximumHeight(1)
hidden_plot.setLabel('bottom', 'Time', units='s')
hidden_plot.setLabel('left', " ")
hidden_plot.showAxis('right', True)
hidden_plot_item = hidden_plot.plot([0, 1], [0, 0])
self._hidden_plot = (hidden_plot, hidden_plot_item)
self.ui.hidden_plot_layout.addWidget(hidden_plot)
time_axis_plot = pg.PlotWidget()
time_axis_plot.setMinimumHeight(120)
time_axis_plot.setMaximumHeight(120)
time_axis_plot.setLabel('bottom', 'Time', units='s')
time_axis_plot.showAxis('right', True)
time_axis_plot.setXLink('runviewer - time axis link')
time_axis_plot.setMouseEnabled(y=False)
time_axis_plot.getAxis('left').setTicks([]) # hide y ticks in the left & right side. only show time axis
time_axis_plot.getAxis('right').setTicks([])
time_axis_plot.setLabel('left', 'Slots')
time_axis_plot.scene().sigMouseMoved.connect(lambda pos: self.mouseMovedEvent(pos, time_axis_plot, "Slots"))
time_axis_plot_item = time_axis_plot.plot([0, 1], [0, 0], pen=(255, 255, 255))
self._time_axis_plot = (time_axis_plot, time_axis_plot_item)
self.all_markers = {}
self.all_marker_items = {}
self.movable_marker_items = {}
markers_plot = pg.PlotWidget(name='runviewer - markers')
markers_plot.setMinimumHeight(120)
markers_plot.setMaximumHeight(120)
markers_plot.showAxis('top', False)
markers_plot.showAxis('bottom', False)
markers_plot.showAxis('left', True)
markers_plot.showAxis('right', True)
markers_plot.getAxis('left').setTicks([])
markers_plot.getAxis('right').setTicks([])
markers_plot.setLabel('left', 'Markers')
markers_plot.setXLink('runviewer - time axis link')
markers_plot.setMouseEnabled(y=False)
markers_plot.scene().sigMouseMoved.connect(lambda pos: self.mouseMovedEvent(pos, markers_plot, "Markers"))
markers_plot_item = markers_plot.plot([])
self._markers_plot = (markers_plot, markers_plot_item)
self.ui.verticalLayout_9.insertWidget(1,markers_plot)
self.ui.plot_layout.addWidget(time_axis_plot)
# add some icons
self.ui.add_shot.setIcon(QIcon(':/qtutils/fugue/plus'))
self.ui.remove_shots.setIcon(QIcon(':/qtutils/fugue/minus'))
self.ui.enable_selected_shots.setIcon(QIcon(':/qtutils/fugue/ui-check-box'))
self.ui.disable_selected_shots.setIcon(QIcon(':/qtutils/fugue/ui-check-box-uncheck'))
self.ui.group_channel.setIcon(QIcon(':/qtutils/fugue/layers-group'))
self.ui.delete_group.setIcon(QIcon(':/qtutils/fugue/layers-ungroup'))
self.ui.channel_move_to_top.setIcon(QIcon(':/qtutils/fugue/arrow-stop-090'))
self.ui.channel_move_up.setIcon(QIcon(':/qtutils/fugue/arrow-090'))
self.ui.channel_move_down.setIcon(QIcon(':/qtutils/fugue/arrow-270'))
self.ui.channel_move_to_bottom.setIcon(QIcon(':/qtutils/fugue/arrow-stop-270'))
self.ui.reset_x_axis.setIcon(QIcon(':/qtutils/fugue/layer-resize-replicate'))
self.ui.reset_y_axis.setIcon(QIcon(':/qtutils/fugue/layer-resize-replicate-vertical'))
self.ui.toggle_tooltip.setIcon(QIcon(':/qtutils/fugue/ui-tooltip-balloon'))
self.ui.linear_time.setIcon(QIcon(':/qtutils/fugue/clock-history'))
self.ui.equal_space_time.setIcon(QIcon(':/qtutils/fugue/border-vertical-all'))
self.ui.linear_time.setEnabled(False)
self.ui.equal_space_time.setEnabled(False)
self.ui.actionOpen_Shot.setIcon(QIcon(':/qtutils/fugue/plus'))
self.ui.actionQuit.setIcon(QIcon(':/qtutils/fugue/cross-button'))
self.ui.actionLoad_channel_config.setIcon(QIcon(':/qtutils/fugue/folder-open'))
self.ui.actionSave_channel_config.setIcon(QIcon(':/qtutils/fugue/disk'))
# disable buttons that are not yet implemented to help avoid confusion!
self.ui.group_channel.setEnabled(False)
self.ui.delete_group.setEnabled(False)
# connect signals
self.ui.reset_x_axis.clicked.connect(self.on_x_axis_reset)
self.ui.reset_y_axis.clicked.connect(self.on_y_axes_reset)
self.ui.channel_move_up.clicked.connect(self._move_up)
self.ui.channel_move_down.clicked.connect(self._move_down)
self.ui.channel_move_to_top.clicked.connect(self._move_top)
self.ui.channel_move_to_bottom.clicked.connect(self._move_bottom)
self.ui.enable_selected_shots.clicked.connect(self._enable_selected_shots)
self.ui.disable_selected_shots.clicked.connect(self._disable_selected_shots)
self.ui.add_shot.clicked.connect(self.on_add_shot)
self.ui.markers_comboBox.currentIndexChanged.connect(self._update_markers)
# self.ui.non_linear_time.toggled.connect(self._toggle_non_linear_time)
self.ui.linear_time.clicked.connect(self._reset_linear_time)
self.ui.equal_space_time.clicked.connect(self._space_markers_evenly)
self.ui.remove_shots.clicked.connect(self.on_remove_shots)
self.ui.actionOpen_Shot.triggered.connect(self.on_add_shot)
self.ui.actionQuit.triggered.connect(self.ui.close)
self.ui.actionLoad_channel_config.triggered.connect(self.on_load_channel_config)
self.ui.actionSave_channel_config.triggered.connect(self.on_save_channel_config)
# Keyboard shortcuts:
QShortcut('Del', self.ui.shot_treeview, lambda: self.on_remove_shots(confirm=True))
QShortcut('Shift+Del', self.ui.shot_treeview, lambda: self.on_remove_shots(confirm=False))
splash.update_text('done')
self.ui.show()
# internal variables
#self._channels_list = {}
self.plot_widgets = {}
self.plot_items = {}
self.shutter_lines = {}
try:
self.default_config_path = os.path.join(exp_config.get('DEFAULT', 'app_saved_configs'), 'runviewer')
except LabConfig.NoOptionError:
exp_config.set('DEFAULT', 'app_saved_configs', os.path.join('%(labscript_suite)s', 'userlib', 'app_saved_configs', '%(apparatus_name)s'))
self.default_config_path = os.path.join(exp_config.get('DEFAULT', 'app_saved_configs'), 'runviewer')
if not os.path.exists(self.default_config_path):
os.makedirs(self.default_config_path)
self.last_opened_shots_folder = exp_config.get('paths', 'experiment_shot_storage')
# start resample thread
self._resample = False
self._thread = threading.Thread(target=self._resample_thread)
self._thread.daemon = True
self._thread.start()
# start shots_to_process_queue monitoring thread
self._shots_to_process_thread = threading.Thread(target=self._process_shots)
self._shots_to_process_thread.daemon = True
self._shots_to_process_thread.start()
self.scale_time = False
self.scalehandler = None
def _update_markers(self, index):
for line, plot in self.all_marker_items.items():
# line.blockSignals(True)
plot.removeItem(line)
self.all_marker_items = {}
for line, plot in self.movable_marker_items.items():
# line.blockSignals(True)
plot.removeItem(line)
self.movable_marker_items = {}
self.marker_times_unscaled = {}
marker_index = self.ui.markers_comboBox.currentIndex()
shot = self.ui.markers_comboBox.itemData(marker_index)
if index == 0:
self.ui.linear_time.setEnabled(False)
self.ui.equal_space_time.setEnabled(False)
self.all_markers = {}
else:
self.ui.linear_time.setEnabled(True)
self.ui.equal_space_time.setEnabled(True)
self.all_markers = shot.markers
# self._update_non_linear_time(changed_shot=True)
times = sorted(list(self.all_markers.keys()))
last_time = 0
for i, (t, m) in enumerate(sorted(self.all_markers.items())):
if i < len(times)-1:
delta_t = times[i+1] - t
# Now always have a marker at stop time
# else:
# delta_t = shot.stop_time - t
unscaled_t = t
if self.scale_time:
t = self.scalehandler.get_scaled_time(t)
color = m['color']
r, g, b = color
if (r, g, b) == (-1, -1, -1):
# Default colour, black:
r, g, b = (0, 0, 0)
color = QColor(r, g, b)
label = m['label'].decode() if isinstance( m['label'], bytes) else str(m['label'])
if i == 0:
line = self._markers_plot[0].addLine(x=t, pen=pg.mkPen(color=color, width=1.5, style=Qt.DashLine), label=label, labelOpts= {"color": color, "fill": QColor(255, 255, 255, 255), "rotateAxis":(1, 0), "anchors": [(0.5, 0),(0.5, 0)]}, movable=False )
else:
line = self._markers_plot[0].addLine(x=t, pen=pg.mkPen(color=color, width=1.5, style=Qt.DashLine), label=label, labelOpts= {"color": color, "fill": QColor(255, 255, 255, 255), "rotateAxis":(1, 0), "anchors": [(0.5, 0),(0.5, 0)]}, movable=True )
line.setBounds([last_time+1e-9 if last_time !=0 else last_time ,None])
line.sigPositionChanged.connect(self._marker_moving)
line.sigPositionChangeFinished.connect(self._marker_moved)
# self.all_marker_items[line] = self._markers_plot[0]
self.movable_marker_items[line] = self._markers_plot[0]
self.marker_times_unscaled[line] = unscaled_t
line = self._time_axis_plot[0].addLine(x=t, pen=pg.mkPen(color=color, width=1.5, style=Qt.DashLine), label=format_time(delta_t), labelOpts= {"color": color, "fill": QColor(255, 255, 255, 255), "rotateAxis":(1, 0), "anchors": [(0.5, 0),(0.5, 0)]}, movable=False )
self.all_marker_items[line] = self._time_axis_plot[0]
last_time = t
self.update_plots()
[docs]
def mouseMovedEvent(self, position, ui, name):
if self.ui.toggle_tooltip.isChecked():
v = ui.scene().views()[0]
viewP = v.mapFromScene(position)
glob_pos = ui.mapToGlobal(viewP) # convert to Screen x
glob_zero = ui.mapToGlobal(QPoint(0, 0))
self._global_start_x = glob_zero.x()
self._global_start_y = glob_zero.y()
self._global_width = ui.width()
self._global_height = ui.height()
coord_pos = ui.plotItem.vb.mapSceneToView(position)
if len(self.get_selected_shots_and_colours()) > 0:
scaled_t = float(coord_pos.x())
if self.scale_time and self.scalehandler is not None:
unscaled_t = float(self.scalehandler.get_unscaled_time(scaled_t))
else:
unscaled_t = scaled_t
if unscaled_t is not None:
pos = QPoint(glob_pos.x(), glob_pos.y())
plot_data = ui.plotItem.listDataItems()[0].getData()
if plot_data[0] is not None and scaled_t is not None:
nearest_index = numpy.abs(plot_data[0] - scaled_t).argmin() - 1
y_val = "{:.2f}".format(plot_data[1][nearest_index])
else:
y_val = '-'
text = "Plot: {} \nTime: {:.9f}s\nValue: {}".format(name, unscaled_t, y_val)
QToolTip.showText(pos, text)
def _reset_linear_time(self):
self.scale_time = False
markers_unscaled = sorted(list(self.all_markers.keys()))
marker_index = self.ui.markers_comboBox.currentIndex()
shot = self.ui.markers_comboBox.itemData(marker_index)
scalehandler = ScaleHandler(markers_unscaled, markers_unscaled, shot.stop_time)
self._update_non_linear_time(new_scalehandler=scalehandler)
self.on_x_axis_reset()
self._resample = True
def _space_markers_evenly(self):
self.scale_time = True
marker_index = self.ui.markers_comboBox.currentIndex()
shot = self.ui.markers_comboBox.itemData(marker_index)
markers_unscaled = sorted(list(self.all_markers.keys()))
target_length = shot.stop_time / float(len(markers_unscaled) - 1)
scaled_times = [target_length * i for i in range(len(markers_unscaled))]
scalehandler = ScaleHandler(markers_unscaled, scaled_times, shot.stop_time)
self._update_non_linear_time(new_scalehandler=scalehandler)
self.on_x_axis_reset()
self._resample = True
def _marker_moving(self, line):
self.scale_time = True
marker_index = self.ui.markers_comboBox.currentIndex()
shot = self.ui.markers_comboBox.itemData(marker_index)
markers_unscaled = sorted(list(self.all_markers.keys()))
# What was the unscaled time of the marker that moved, and where is it now?
moved_marker_unscaled_t = self.marker_times_unscaled[line]
moved_marker_new_pos = line.pos().x()
# Where was the marker just before it was moved? This is given by the current scalehandler
if self.scalehandler is not None:
moved_marker_last_pos = self.scalehandler.get_scaled_time(moved_marker_unscaled_t)
else:
moved_marker_last_pos = moved_marker_unscaled_t
# How far has the marker moved?
delta_marker = moved_marker_new_pos - moved_marker_last_pos
# Now we want to shift the other markers if the are at a higher position than this one
markers = list(self.marker_times_unscaled.keys())
new_scaled_times = []
for marker in markers:
if marker == line:
new_scaled_times.append(moved_marker_new_pos)
else:
x = marker.pos().x()
if x > moved_marker_last_pos:
x += delta_marker
new_scaled_times.append(x)
new_scaled_times = sorted(new_scaled_times)
scalehandler = ScaleHandler(markers_unscaled,new_scaled_times, shot.stop_time)
self._update_non_linear_time(new_scalehandler=scalehandler)
def _marker_moved(self, line):
self._resample = True
def _update_non_linear_time(self, changed_shot=False, new_scalehandler=None):
marker_index = self.ui.markers_comboBox.currentIndex()
shot = self.ui.markers_comboBox.itemData(marker_index)
if new_scalehandler is None:
# make a 1:1 scalehandler using the hidden_plot
self.scale_time = False
end_t = self._hidden_plot[1].getData()[0][-1]
new_scalehandler = ScaleHandler([0,end_t],[0,end_t],end_t)
old_scalehandler = self.scalehandler
self.scalehandler = new_scalehandler
# combine markers and shutter lines
markers = list(self.all_marker_items.keys())
for channel in self.shutter_lines:
for shot in self.shutter_lines[channel]:
for line in self.shutter_lines[channel][shot][0]:
markers.append(line)
for line in self.shutter_lines[channel][shot][1]:
markers.append(line)
# Move all Markes/Shutter Lines to new position
for marker in markers:
pos = marker.pos()
if old_scalehandler is None:
unscaled_x = pos.x()
else:
unscaled_x = old_scalehandler.get_unscaled_time(pos.x())
if self.scale_time and self.scalehandler is not None:
new_x = self.scalehandler.get_scaled_time(unscaled_x)
else:
new_x = unscaled_x
pos.setX(new_x)
marker.setPos(pos)
# Move the movable lines in the upper graph
mv_markers = list(self.movable_marker_items.keys())
new_marker_times = {}
for marker in mv_markers:
if self.scale_time and self.scalehandler is not None:
new_x = self.scalehandler.get_scaled_time(self.marker_times_unscaled[marker])
else:
new_x = self.marker_times_unscaled[marker]
new_marker_times[float(new_x)] = marker
last_time = None
for t in sorted(list(new_marker_times.keys())):
marker = new_marker_times[t]
marker.blockSignals(True)
marker.setBounds([None, None])
marker.setPos(t)
marker.setBounds([last_time+1e-9 if last_time is not None else 0.0, None])
marker.blockSignals(False)
last_time = t
if shot is not None and self.scale_time:
self._time_axis_plot[0].getAxis("bottom").setTicks([[[0, str(0)], [shot.stop_time, str(shot.stop_time)]]])
for plot in self.plot_widgets.values():
plot.getAxis("bottom").setTicks([[[0, str(0)], [shot.stop_time, str(shot.stop_time)]]])
else:
self._time_axis_plot[0].getAxis("bottom").setTicks(None)
for plot in self.plot_widgets.values():
plot.getAxis("bottom").setTicks(None)
for plot in self.plot_widgets.values():
for item in plot.getPlotItem().items:
if isinstance(item, pg.PlotDataItem):
if old_scalehandler is not None:
unscaled_t = old_scalehandler.get_unscaled_time(item.xData)
else:
unscaled_t = item.xData
if self.scalehandler is not None:
item.setData(self.scalehandler.get_scaled_time(unscaled_t), item.yData)
else:
item.setData(unscaled_t, item.yData)
def _process_shots(self):
while True:
filepath = shots_to_process_queue.get()
inmain_later(self.load_shot, filepath)
[docs]
def on_load_channel_config(self):
config_file = QFileDialog.getOpenFileName(self.ui, "Select file to load", self.default_config_path, "Config files (*.ini)")
if isinstance(config_file, tuple):
config_file, _ = config_file
if config_file:
runviewer_config = load_appconfig(config_file).get('runviewer_state', {})
channels = runviewer_config.get('channels', {})
for row, (channel, checked) in enumerate(channels):
check_items = self.channel_model.findItems(channel)
if len(check_items) == 0:
items = []
check_item = QStandardItem(channel)
check_item.setEditable(False)
check_item.setCheckable(True)
items.append(check_item)
check_item.setCheckState(Qt.Checked if checked else Qt.Unchecked)
check_item.setEnabled(False)
self.channel_model.insertRow(row, items)
else:
check_item = check_items[0]
check_item.setCheckState(Qt.Checked if checked else Qt.Unchecked)
self.channel_model.takeRow(check_item.row())
self.channel_model.insertRow(row, check_item)
[docs]
def on_save_channel_config(self):
save_file = QFileDialog.getSaveFileName(self.ui, 'Select file to save current channel configuration', self.default_config_path, "config files (*.ini)")
if type(save_file) is tuple:
save_file, _ = save_file
if save_file:
channels = []
for row in range(self.channel_model.rowCount()):
item = self.channel_model.item(row)
channels.append((item.text(), item.checkState() == Qt.Checked))
save_appconfig(save_file, {'runviewer_state': {'channels': channels}})
[docs]
def on_toggle_shutter(self, checked, current_shot):
for channel in self.shutter_lines:
for shot in self.shutter_lines[channel]:
if shot == current_shot:
for line in self.shutter_lines[channel][shot][0]:
if checked:
line.show()
else:
line.hide()
for line in self.shutter_lines[channel][shot][1]:
if checked:
line.show()
else:
line.hide()
[docs]
def on_add_shot(self):
selected_files = QFileDialog.getOpenFileNames(self.ui, "Select file to load", self.last_opened_shots_folder, "HDF5 files (*.h5 *.hdf5)")
popup_warning = False
if isinstance(selected_files, tuple):
selected_files, _ = selected_files
# Convert to standard platform specific path, otherwise Qt likes forward slashes:
selected_files = [os.path.abspath(str(shot_file)) for shot_file in selected_files]
if len(selected_files) > 0:
self.last_opened_shots_folder = os.path.dirname(selected_files[0])
for file in selected_files:
try:
filepath = str(file)
# Qt has this weird behaviour where if you type in the name of a file that exists
# but does not have the extension you have limited the dialog to, the OK button is greyed out
# but you can hit enter and the file will be selected.
# So we must check the extension of each file here!
if filepath.endswith('.h5') or filepath.endswith('.hdf5'):
self.load_shot(filepath)
else:
popup_warning = True
except:
popup_warning = True
raise
if popup_warning:
message = QMessageBox()
message.setText("Warning: Some shots were not loaded because they were not valid hdf5 files")
message.setIcon(QMessageBox.Warning)
message.setWindowTitle("Runviewer")
message.setStandardButtons(QMessageBox.Ok)
message.exec_()
[docs]
def on_remove_shots(self, confirm=True):
# Get the selection model from the treeview
selection_model = self.ui.shot_treeview.selectionModel()
# Create a list of select row indices
selected_row_list = [index.row() for index in selection_model.selectedRows()]
# sort in descending order to prevent index changes of rows to be deleted
selected_row_list.sort(reverse=True)
if confirm:
reply = QMessageBox.question(self.ui, 'Runviewer', 'Remove {} shots?'.format(len(selected_row_list)),
QMessageBox.Yes | QMessageBox.No)
if reply == QMessageBox.No:
return
for row in selected_row_list:
item = self.shot_model.item(row, SHOT_MODEL__CHECKBOX_INDEX)
colour_item = self.shot_model.item(row, SHOT_MODEL__COLOUR_INDEX)
shutter_item = self.shot_model.item(row, SHOT_MODEL__SHUTTER_INDEX)
shot = item.data()
# unselect shot
item.setCheckState(Qt.Unchecked)
shutter_item.setCheckState(Qt.Unchecked)
# remove shot from markers list
shot_combobox_index = self.ui.markers_comboBox.findText(os.path.basename(shot.path))
self.ui.markers_comboBox.removeItem(shot_combobox_index)
# remove row
self.shot_model.removeRow(row)
del shot
[docs]
def on_shot_selection_changed(self, item):
if self.shot_model.indexFromItem(item).column() == SHOT_MODEL__CHECKBOX_INDEX:
# add or remove a colour for this shot
checked = item.checkState()
row = self.shot_model.indexFromItem(item).row()
colour_item = self.shot_model.item(row, SHOT_MODEL__COLOUR_INDEX)
check_shutter = self.shot_model.item(row, SHOT_MODEL__SHUTTER_INDEX)
if checked:
colour = colour_item.data(Qt.UserRole)
if colour is not None:
colour = colour()
else:
colour = self.shot_colour_delegate.get_next_colour()
colour_item.setEditable(True)
pixmap = QPixmap(20, 20)
pixmap.fill(colour)
icon = QIcon(pixmap)
colour_item.setData(lambda clist=self.shot_colour_delegate._colours, colour=colour: int_to_enum(clist, colour), Qt.UserRole)
colour_item.setData(icon, Qt.DecorationRole)
shot_combobox_index = self.ui.markers_comboBox.findText(os.path.basename(item.data().path))
self.ui.markers_comboBox.model().item(shot_combobox_index).setEnabled(True)
if self.ui.markers_comboBox.currentIndex() == 0:
self.ui.markers_comboBox.setCurrentIndex(shot_combobox_index)
if item.data().shutter_times != {}:
check_shutter.setEnabled(True)
else:
check_shutter.setEnabled(False)
check_shutter.setToolTip("This shot doesn't contain shutter markers")
else:
# colour = None
# icon = None
shot_combobox_index = self.ui.markers_comboBox.findText(os.path.basename(item.data().path))
self.ui.markers_comboBox.model().item(shot_combobox_index).setEnabled(False)
if shot_combobox_index == self.ui.markers_comboBox.currentIndex():
self.ui.markers_comboBox.setCurrentIndex(0)
colour_item.setEditable(False)
check_shutter.setEnabled(False)
# model.setData(index, editor.itemIcon(editor.currentIndex()),
# model.setData(index, editor.itemData(editor.currentIndex()), Qt.UserRole)
self.update_channels_treeview()
elif self.shot_model.indexFromItem(item).column() == SHOT_MODEL__COLOUR_INDEX:
# update the plot colours
# get reference to the changed shot
current_shot = self.shot_model.item(self.shot_model.indexFromItem(item).row(), SHOT_MODEL__CHECKBOX_INDEX).data()
# find and update the pen of the plot items
for channel in self.plot_items.keys():
for shot in self.plot_items[channel]:
if shot == current_shot:
colour = item.data(Qt.UserRole)
self.plot_items[channel][shot].setPen(pg.mkPen(QColor(colour()), width=2))
elif self.shot_model.indexFromItem(item).column() == SHOT_MODEL__SHUTTER_INDEX:
current_shot = self.shot_model.item(self.shot_model.indexFromItem(item).row(), SHOT_MODEL__CHECKBOX_INDEX).data()
self.on_toggle_shutter(item.checkState(), current_shot)
[docs]
def load_shot(self, filepath):
logger.info(f'loading run from {filepath:s}')
shot = Shot(filepath)
# add shot to shot list
# Create Items
items = []
colour_item = QStandardItem('')
colour_item.setEditable(False)
colour_item.setToolTip('Double-click to change colour')
items.append(colour_item)
check_shutter = QStandardItem()
check_shutter.setCheckable(True)
check_shutter.setCheckState(Qt.Unchecked) # options are Qt.Checked OR Qt.Unchecked
check_shutter.setEnabled(False)
check_shutter.setToolTip("Toggle shutter markers")
items.append(check_shutter)
check_item = QStandardItem(shot.path)
check_item.setEditable(False)
check_item.setCheckable(True)
check_item.setCheckState(Qt.Unchecked) # options are Qt.Checked OR Qt.Unchecked
check_item.setData(shot)
check_item.setToolTip(filepath)
items.append(check_item)
# script name
# path_item = QStandardItem(shot.path)
# path_item.setEditable(False)
# items.append(path_item)
self.shot_model.appendRow(items)
self.ui.markers_comboBox.addItem(os.path.basename(shot.path), shot)
shot_combobox_index = self.ui.markers_comboBox.findText(os.path.basename(shot.path))
self.ui.markers_comboBox.model().item(shot_combobox_index).setEnabled(False)
# only do this if we are checking the shot we are adding
# self.update_channels_treeview()
[docs]
def get_selected_shots_and_colours(self):
# get the ticked shots
ticked_shots = {}
for i in range(self.shot_model.rowCount()):
item = self.shot_model.item(i, SHOT_MODEL__CHECKBOX_INDEX)
colour_item = self.shot_model.item(i, SHOT_MODEL__COLOUR_INDEX)
shutter_item = self.shot_model.item(i, SHOT_MODEL__SHUTTER_INDEX)
if item.checkState() == Qt.Checked:
shot = item.data()
colour_item_data = colour_item.data(Qt.UserRole)
ticked_shots[shot] = (colour_item_data(), shutter_item.checkState())
return ticked_shots
[docs]
def update_channels_treeview(self):
ticked_shots = self.get_selected_shots_and_colours()
# get set of channels
channels = {}
for shot in ticked_shots.keys():
channels[shot] = set(shot.channels)
channels_set = frozenset().union(*channels.values())
# now find channels in channels_set which are not in the treeview, and add them
# now find channels in channels set which are already in the treeview, but deactivated, and activate them
treeview_channels_dict = {}
deactivated_treeview_channels_dict = {}
for i in range(self.channel_model.rowCount()):
item = self.channel_model.item(i, CHANNEL_MODEL__CHECKBOX_INDEX)
# Sanity check
if str(item.text()) in treeview_channels_dict:
raise RuntimeError("A duplicate channel name was detected in the treeview due to an internal error. Please lodge a bugreport detailing how the channels with the same name appeared in the channel treeview. Please restart the application")
treeview_channels_dict[str(item.text())] = i
if not item.isEnabled():
deactivated_treeview_channels_dict[str(item.text())] = i
treeview_channels = set(treeview_channels_dict.keys())
deactivated_treeview_channels = set(deactivated_treeview_channels_dict.keys())
# speed up working with self.channel_model by blocking signals and later reenabeling them
self.channel_model.blockSignals(True)
# find list of channels to work with
channels_to_add = channels_set.difference(treeview_channels)
for channel in sorted(channels_to_add):
items = []
check_item = QStandardItem(channel)
check_item.setEditable(False)
check_item.setCheckable(True)
check_item.setCheckState(Qt.Unchecked)
items.append(check_item)
# channel_name_item = QStandardItem(channel)
# channel_name_item.setEditable(False)
# items.append(channel_name_item)
self.channel_model.appendRow(items)
channels_to_reactivate = deactivated_treeview_channels.intersection(channels_set)
for channel in channels_to_reactivate:
for i in range(self.channel_model.columnCount()):
item = self.channel_model.item(deactivated_treeview_channels_dict[channel], i)
item.setEnabled(True)
item.setSelectable(True)
# now find channels in the treeview which are not in the channels_set and deactivate them
channels_to_deactivate = treeview_channels.difference(channels_set)
for channel in channels_to_deactivate:
for i in range(self.channel_model.columnCount()):
item = self.channel_model.item(treeview_channels_dict[channel], i)
item.setEnabled(False)
item.setSelectable(False)
self.channel_model.blockSignals(False)
self.channel_model.layoutChanged.emit()
# TODO: Also update entries in groups
self.update_plots()
[docs]
def update_plots(self):
# get list of selected shots
ticked_shots = self.get_selected_shots_and_colours()
# SHould we rescale the x-axis?
# if self._hidden_plot[0].getViewBox.getState()['autoRange'][0]:
# self._hidden_plot[0].enableAutoRange(axis=pg.ViewBox.XAxis)
# else:
# self._hidden_plot[0].enableAutoRange(axis=pg.ViewBox.XAxis, enable=False)
# find stop time of longest ticked shot
largest_stop_time = 0
stop_time_set = False
for shot in ticked_shots.keys():
if self.scale_time:
st = self.scalehandler.get_scaled_time(shot.stop_time)
else:
st = shot.stop_time
if st > largest_stop_time:
largest_stop_time = st
stop_time_set = True
if not stop_time_set:
largest_stop_time = 1.0
# Update the range of the link plot
self._hidden_plot[1].setData([0, largest_stop_time], [0, 1e-9])
# Update plots
for i in range(self.channel_model.rowCount()):
check_item = self.channel_model.item(i, CHANNEL_MODEL__CHECKBOX_INDEX)
channel = str(check_item.text())
if check_item.checkState() == Qt.Checked and check_item.isEnabled():
# we want to show this plot
# does a plot already exist? If yes, show it
if channel in self.plot_widgets:
self.plot_widgets[channel].show()
# update the plot
# are there are plot items for this channel which are shown that should not be?
to_delete = []
for shot in self.plot_items[channel]:
if shot not in ticked_shots.keys():
self.plot_widgets[channel].removeItem(self.plot_items[channel][shot])
# Remove Shutter Markers of unticked Shots
if shot in self.shutter_lines[channel]:
for line in self.shutter_lines[channel][shot][0]:
self.plot_widgets[channel].removeItem(line)
for line in self.shutter_lines[channel][shot][1]:
self.plot_widgets[channel].removeItem(line)
self.shutter_lines[channel].pop(shot)
to_delete.append(shot)
for shot in to_delete:
del self.plot_items[channel][shot]
# do we need to add any plot items for shots that were not previously selected?
for shot, (colour, shutters_checked) in ticked_shots.items():
if shot not in self.plot_items[channel]:
# plot_item = self.plot_widgets[channel].plot(shot.traces[channel][0], shot.traces[channel][1], pen=pg.mkPen(QColor(colour), width=2))
# Add empty plot as it the custom resampling we do will happen quicker if we don't attempt to first plot all of the data
plot_item = self.plot_widgets[channel].plot([0, 0], [0], pen=pg.mkPen(QColor(colour), width=2), stepMode='center')
self.plot_items[channel][shot] = plot_item
# Add Shutter Markers of newly ticked Shots
self.add_shutter_markers(shot, channel, shutters_checked)
for t, m in self.all_markers.items():
color = m['color']
color = QColor(color[0], color[1], color[2])
if self.scale_time and self.scalehandler is not None:
t = self.scalehandler.get_scaled_time(t)
line = self.plot_widgets[channel].addLine(x=t, pen=pg.mkPen(color=color, width=1.5, style=Qt.DashLine))
self.all_marker_items[line] = self.plot_widgets[channel]
# If no, create one
else:
self.create_plot(channel, ticked_shots)
else:
if channel not in self.plot_widgets:
self.create_plot(channel, ticked_shots)
self.plot_widgets[channel].hide()
self._resample = True
[docs]
def create_plot(self, channel, ticked_shots):
self.plot_widgets[channel] = pg.PlotWidget() # name=channel)
self.plot_widgets[channel].setMinimumHeight(200)
self.plot_widgets[channel].setMaximumHeight(200)
self.plot_widgets[channel].setLabel('bottom', 'Time', units='s')
self.plot_widgets[channel].showAxis('right', True)
self.plot_widgets[channel].showAxis('bottom', True)
self.plot_widgets[channel].setXLink('runviewer - time axis link')
self.plot_widgets[channel].sigXRangeChanged.connect(self.on_x_range_changed)
self.plot_widgets[channel].scene().sigMouseMoved.connect(lambda pos: self.mouseMovedEvent(pos, self.plot_widgets[channel], channel))
self.ui.plot_layout.insertWidget(self.ui.plot_layout.count() - 1, self.plot_widgets[channel])
self.shutter_lines[channel] = {} # initialize Storage for shutter lines
self.plot_items.setdefault(channel, {})
has_units = False
units = ''
for shot, (colour, shutters_checked) in ticked_shots.items():
if channel in shot.traces:
# plot_item = self.plot_widgets[channel].plot(shot.traces[channel][0], shot.traces[channel][1], pen=pg.mkPen(QColor(colour), width=2))
# Add empty plot as it the custom resampling we do will happen quicker if we don't attempt to first plot all of the data
plot_item = self.plot_widgets[channel].plot([0, 0], [0], pen=pg.mkPen(QColor(colour), width=2), stepMode='center')
self.plot_items[channel][shot] = plot_item
if len(shot.traces[channel]) == 3:
has_units = True
units = shot.traces[channel][2]
# Add Shutter Markers of ticked Shots
self.add_shutter_markers(shot, channel, shutters_checked)
if has_units:
self.plot_widgets[channel].setLabel('left', channel, units=units)
else:
self.plot_widgets[channel].setLabel('left', channel)
[docs]
def add_shutter_markers(self, shot, channel, shutters_checked):
if shot not in self.shutter_lines[channel] and channel in shot.shutter_times:
self.shutter_lines[channel][shot] = [[], []]
open_color = QColor(0, 255, 0)
close_color = QColor(255, 0, 0)
for t, val in shot.shutter_times[channel].items():
scaled_t = t
if val: # val != 0, shutter open
line = self.plot_widgets[channel].addLine(x=scaled_t, pen=pg.mkPen(color=open_color, width=4., style=Qt.DotLine))
self.shutter_lines[channel][shot][1].append(line)
if not shutters_checked:
line.hide()
else: # else shutter close
line = self.plot_widgets[channel].addLine(x=scaled_t, pen=pg.mkPen(color=close_color, width=4., style=Qt.DotLine))
self.shutter_lines[channel][shot][0].append(line)
if not shutters_checked:
line.hide()
[docs]
def on_x_range_changed(self, *args):
# print 'x range changed'
self._resample = True
@inmain_decorator(wait_for_return=True)
def _get_resample_params(self, channel, shot):
rect = self.plot_items[channel][shot].getViewBox().viewRect()
xmin, xmax = rect.left(), rect.width() + rect.left()
dx = xmax - xmin
view_range = self.plot_widgets[channel].viewRange()
return view_range[0][0], view_range[0][1], dx
[docs]
def resample(self, data_x, data_y, xmin, xmax, stop_time, num_pixels):
"""This is a function for downsampling the data before plotting
it. Unlike using nearest neighbour interpolation, this method
preserves the features of the plot. It chooses what value to
use based on what values within a region are most different
from the values it's already chosen. This way, spikes of a short
duration won't just be skipped over as they would with any sort
of interpolation."""
# TODO: Only finely sample the currently visible region. Coarsely sample the rest
# x_out = numpy.float32(numpy.linspace(data_x[0], data_x[-1], 4000*(data_x[-1]-data_x[0])/(xmax-xmin)))
x_out = numpy.float64(numpy.linspace(xmin, xmax, 3 * 2000 + 2))
y_out = numpy.empty(len(x_out) - 1, dtype=numpy.float64)
data_x = numpy.float64(data_x)
data_y = numpy.float64(data_y)
# TODO: investigate only resampling when necessary.
# Currently pyqtgraph sometimes has trouble rendering things
# if you don't resample. If a point is far off the graph,
# and this point is the first that should be drawn for stepMode,
# because there is a long gap before the next point (which is
# visible) then there is a problem.
# Also need to explicitly handle cases where none of the data
# is visible (which resampling does by setting NaNs)
#
# x_data_slice = data_x[(data_x>=xmin)&(data_x<=xmax)]
# print len(data_x)
# if len(x_data_slice) < 3*2000+2:
# x_out = x_data_slice
# y_out = data_y[(data_x>=xmin)&(data_x<=xmax)][:-1]
# logger.info('skipping resampling')
# else:
resampling = True
if resampling:
_resample(data_x, data_y, x_out, y_out, numpy.float64(stop_time))
# self.__resample4(data_x, data_y, x_out, y_out, numpy.float32(stop_time))
else:
x_out, y_out = data_x, data_y
return x_out, y_out
def __resample4(self, x_in, y_in, x_out, y_out, stop_time):
# we want x-out to have three times the number of points as there are pixels
# Plus one at the end
# y_out = numpy.empty(len(x_out)-1, dtype=numpy.float64)
# print 'len x_out: %d'%len(x_out)
# A couple of special cases that I don't want to have to put extra checks in for:
if x_out[-1] < x_in[0] or x_out[0] > stop_time:
# We're all the way to the left of the data or all the way to the right. Fill with NaNs:
y_out.fill('NaN')
elif x_out[0] > x_in[-1]:
# We're after the final clock tick, but before stop_time
i = 0
while i < len(x_out) - 1:
if x_out[i] < stop_time:
y_out[i] = y_in[-1]
else:
y_out[i] = numpy.float('NaN')
i += 1
else:
i = 0
j = 1
# Until we get to the data, fill the output array with NaNs (which
# get ignored when plotted)
while x_out[i] < x_in[0]:
y_out[i] = numpy.float('NaN')
y_out[i + 1] = numpy.float('NaN')
y_out[i + 2] = numpy.float('NaN')
i += 3
# If we're some way into the data, we need to skip ahead to where
# we want to get the first datapoint from:
while x_in[j] < x_out[i]:
j += 1
# Get the first datapoint:
# y_out[i] = y_in[j-1]
# i += 1
# Get values until we get to the end of the data:
while j < len(x_in) and i < len(x_out) - 2: # Leave one spare for the final data point and one because stepMode='center' requires len(y)=len(x)-1
# This is 'nearest neighbour on the left' interpolation. It's
# what we want if none of the source values checked in the
# upcoming loop are used:
y_out[i] = y_in[j - 1]
i += 2
positive_jump_value = 0
positive_jump_index = j - 1
negative_jump_value = 0
negative_jump_index = j - 1
# now find the max and min values between this x_out time point and the next x_out timepoint
# print i
while j < len(x_in) and x_in[j] < x_out[i]:
jump = y_in[j] - y_out[i - 2]
# would using this source value cause a bigger positive jump?
if jump > 0 and jump > positive_jump_value:
positive_jump_value = jump
positive_jump_index = j
# would using this source value cause a bigger negative jump?
elif jump < 0 and jump < negative_jump_value:
negative_jump_value = jump
negative_jump_index = j
j += 1
if positive_jump_index < negative_jump_index:
y_out[i - 1] = y_in[positive_jump_index]
y_out[i] = y_in[negative_jump_index]
# TODO: We could override the x_out values with x_in[jump_index]
else:
y_out[i - 1] = y_in[negative_jump_index]
y_out[i] = y_in[positive_jump_index]
i += 1
# Get the last datapoint:
if j < len(x_in):
# If the sample rate of the raw data is low, then the current
# j point could be outside the current plot view range
# If so, decrease j so that we take a value that is within the
# plot view range.
if x_in[j] > x_out[-1] and j > 0:
j -= 1
y_out[i] = y_in[j]
i += 1
# if i < len(x_out):
# y_out[i] = y_in[-1]
# i += 1
# Fill the remainder of the array with the last datapoint,
# if t < stop_time, and then NaNs after that:
while i < len(x_out) - 1:
if x_out[i] < stop_time:
y_out[i] = y_in[-1]
else:
y_out[i] = numpy.float('NaN')
i += 1
# return y_out # method changed to modify y_out array in place
def __resample3(self, x_in, y_in, x_out, stop_time):
"""This is a Python implementation of the C extension. For
debugging and developing the C extension."""
y_out = numpy.empty(len(x_out))
i = 0
j = 1
# A couple of special cases that I don't want to have to put extra checks in for:
if x_out[-1] < x_in[0] or x_out[0] > stop_time:
# We're all the way to the left of the data or all the way to the right. Fill with NaNs:
while i < len(x_out):
y_out[i] = numpy.float('NaN')
i += 1
elif x_out[0] > x_in[-1]:
# We're after the final clock tick, but before stop_time
while i < len(x_out):
if x_out[i] < stop_time:
y_out[i] = y_in[-1]
else:
y_out[i] = numpy.float('NaN')
i += 1
else:
# Until we get to the data, fill the output array with NaNs (which
# get ignored when plotted)
while x_out[i] < x_in[0]:
y_out[i] = numpy.float('NaN')
i += 1
# If we're some way into the data, we need to skip ahead to where
# we want to get the first datapoint from:
while x_in[j] < x_out[i]:
j += 1
# Get the first datapoint:
y_out[i] = y_in[j - 1]
i += 1
# Get values until we get to the end of the data:
while j < len(x_in) and i < len(x_out):
# This is 'nearest neighbour on the left' interpolation. It's
# what we want if none of the source values checked in the
# upcoming loop are used:
y_out[i] = y_in[j - 1]
while j < len(x_in) and x_in[j] < x_out[i]:
# Would using this source value cause the interpolated values
# to make a bigger jump?
if numpy.abs(y_in[j] - y_out[i - 1]) > numpy.abs(y_out[i] - y_out[i - 1]):
# If so, use this source value:
y_out[i] = y_in[j]
j += 1
i += 1
# Get the last datapoint:
if i < len(x_out):
y_out[i] = y_in[-1]
i += 1
# Fill the remainder of the array with the last datapoint,
# if t < stop_time, and then NaNs after that:
while i < len(x_out):
if x_out[i] < stop_time:
y_out[i] = y_in[-1]
else:
y_out[i] = numpy.float('NaN')
i += 1
return y_out
def _resample_thread(self):
# logger = logging.getLogger('runviewer.resample_thread')
with concurrent.futures.ThreadPoolExecutor(max_workers=os.cpu_count()) as executer:
while True:
if self._resample:
self._resample = False
channel_data = self.__get_all_resample_params()
results = []
for args in channel_data:
results.append(executer.submit(self.__pool_resample, *args))
# wait for all inmain_later calls from threadpool to finish before we trigger a new resample
for future in results:
result = future.result()
if isinstance(result, Queue):
result.get()
time.sleep(0.1)
@inmain_decorator(wait_for_return=True)
def __get_all_resample_params(self):
return [(channel, shot, colour, *self._get_resample_params(channel, shot))
for shot, (colour, shutters_checked) in self.get_selected_shots_and_colours().items()
for channel in shot.traces
if self.channel_checked_and_enabled(channel)
]
def __pool_resample(self, channel, shot, colour, xmin, xmax, dx):
try:
# We go a bit outside the visible range so that scrolling
# doesn't immediately go off the edge of the data, and the
# next resampling might have time to fill in more data before
# the user sees any empty space.
if self.scale_time:
xnew, ynew = self.resample(shot.scaled_times(channel), shot.traces[channel][1], xmin, xmax, shot.stop_time, dx)
else:
xnew, ynew = self.resample(shot.traces[channel][0], shot.traces[channel][1], xmin, xmax, shot.stop_time, dx)
return inmain_later(self.plot_items[channel][shot].setData, xnew, ynew, pen=pg.mkPen(QColor(colour), width=2), stepMode='center')
except Exception:
#self._resample = True
pass
[docs]
@inmain_decorator(wait_for_return=True)
def channel_checked_and_enabled(self, channel):
# logger.info('is channel %s enabled' % channel)
index = self.channel_model.index(0, CHANNEL_MODEL__CHANNEL_INDEX)
indexes = self.channel_model.match(index, Qt.DisplayRole, channel, 1, Qt.MatchExactly)
# logger.info('number of matches %d' % len(indexes))
if len(indexes) == 1:
check_item = self.channel_model.itemFromIndex(indexes[0])
if check_item.checkState() == Qt.Checked and check_item.isEnabled():
return True
return False
[docs]
def on_x_axis_reset(self):
ticked_shots = self.get_selected_shots_and_colours()
largest_stop_time = 0
stop_time_set = False
for shot in ticked_shots.keys():
if self.scale_time:
st = self.scalehandler.get_scaled_time(shot.stop_time)
else:
st = shot.stop_time
if st > largest_stop_time:
largest_stop_time = st
stop_time_set = True
if not stop_time_set:
largest_stop_time = 1.0
# Update the range of the link plot
self._hidden_plot[1].setData([0, largest_stop_time], [0, 1e-9])
self._hidden_plot[0].enableAutoRange(axis=pg.ViewBox.XAxis)
[docs]
def on_y_axes_reset(self):
for plot_widget in self.plot_widgets.values():
plot_widget.enableAutoRange(axis=pg.ViewBox.YAxis)
def _enable_selected_shots(self):
self.update_ticks_of_selected_shots(Qt.Checked)
def _disable_selected_shots(self):
self.update_ticks_of_selected_shots(Qt.Unchecked)
[docs]
def update_ticks_of_selected_shots(self, state):
# Get the selection model from the treeview
selection_model = self.ui.shot_treeview.selectionModel()
# Create a list of select row indices
selected_row_list = [index.row() for index in sorted(selection_model.selectedRows())]
# for each row selected
for row in selected_row_list:
check_item = self.shot_model.item(row, SHOT_MODEL__CHECKBOX_INDEX)
check_item.setCheckState(state)
def _move_up(self):
# Get the selection model from the treeview
selection_model = self.ui.channel_treeview.selectionModel()
# Create a list of select row indices
selected_row_list = [index.row() for index in sorted(selection_model.selectedRows())]
# For each row selected
for i, row in enumerate(selected_row_list):
# only move the row if it is not element 0, and the row above it is not selected
# (note that while a row above may have been initially selected, it should by now, be one row higher
# since we start moving elements of the list upwards starting from the lowest index)
if row > 0 and (row - 1) not in selected_row_list:
# Remove the selected row
items = self.channel_model.takeRow(row)
# Add the selected row into a position one above
self.channel_model.insertRow(row - 1, items)
# Since it is now a newly inserted row, select it again
selection_model.select(self.channel_model.indexFromItem(items[0]), QItemSelectionModel.SelectCurrent)
# reupdate the list of selected indices to reflect this change
selected_row_list[i] -= 1
self.update_plot_positions()
def _move_down(self):
# Get the selection model from the treeview
selection_model = self.ui.channel_treeview.selectionModel()
# Create a list of select row indices
selected_row_list = [index.row() for index in reversed(sorted(selection_model.selectedRows()))]
# For each row selected
for i, row in enumerate(selected_row_list):
# only move the row if it is not the last element, and the row above it is not selected
# (note that while a row below may have been initially selected, it should by now, be one row lower
# since we start moving elements of the list upwards starting from the highest index)
if row < self.channel_model.rowCount() - 1 and (row + 1) not in selected_row_list:
# Remove the selected row
items = self.channel_model.takeRow(row)
# Add the selected row into a position one above
self.channel_model.insertRow(row + 1, items)
# Since it is now a newly inserted row, select it again
selection_model.select(self.channel_model.indexFromItem(items[0]), QItemSelectionModel.SelectCurrent)
# reupdate the list of selected indices to reflect this change
selected_row_list[i] += 1
self.update_plot_positions()
def _move_top(self):
# Get the selection model from the treeview
selection_model = self.ui.channel_treeview.selectionModel()
# Create a list of select row indices
selected_row_list = [index.row() for index in sorted(selection_model.selectedRows())]
# For each row selected
for i, row in enumerate(selected_row_list):
# only move the row while it is not element 0, and the row above it is not selected
# (note that while a row above may have been initially selected, it should by now, be one row higher
# since we start moving elements of the list upwards starting from the lowest index)
while row > 0 and (row - 1) not in selected_row_list:
# Remove the selected row
items = self.channel_model.takeRow(row)
# Add the selected row into a position one above
self.channel_model.insertRow(row - 1, items)
# Since it is now a newly inserted row, select it again
selection_model.select(self.channel_model.indexFromItem(items[0]), QItemSelectionModel.SelectCurrent)
# reupdate the list of selected indices to reflect this change
selected_row_list[i] -= 1
row -= 1
self.update_plot_positions()
def _move_bottom(self):
selection_model = self.ui.channel_treeview.selectionModel()
# Create a list of select row indices
selected_row_list = [index.row() for index in reversed(sorted(selection_model.selectedRows()))]
# For each row selected
for i, row in enumerate(selected_row_list):
# only move the row while it is not the last element, and the row above it is not selected
# (note that while a row below may have been initially selected, it should by now, be one row lower
# since we start moving elements of the list upwards starting from the highest index)
while row < self.channel_model.rowCount() - 1 and (row + 1) not in selected_row_list:
# Remove the selected row
items = self.channel_model.takeRow(row)
# Add the selected row into a position one above
self.channel_model.insertRow(row + 1, items)
# Since it is now a newly inserted row, select it again
selection_model.select(self.channel_model.indexFromItem(items[0]), QItemSelectionModel.SelectCurrent)
# reupdate the list of selected indices to reflect this change
selected_row_list[i] += 1
row += 1
self.update_plot_positions()
[docs]
def update_plot_positions(self):
# remove all widgets
layout_items = {}
for i in range(self.ui.plot_layout.count()):
if i == 0:
continue
item = self.ui.plot_layout.takeAt(i)
# add all widgets
for i in range(self.channel_model.rowCount()):
check_item = self.channel_model.item(i, CHANNEL_MODEL__CHECKBOX_INDEX)
channel = str(check_item.text())
if channel in self.plot_widgets:
self.ui.plot_layout.addWidget(self.plot_widgets[channel])
if check_item.checkState() == Qt.Checked and check_item.isEnabled():
self.plot_widgets[channel].show()
else:
self.plot_widgets[channel].hide()
self.ui.plot_layout.addWidget(self._time_axis_plot[0])
[docs]
class Shot(object):
def __init__(self, path):
self.path = path
# Store list of traces
self._traces = None
# store list of channels
self._channels = None
# store list of markers
self._markers = None
self.cached_scaler = None
self._scalehandler = None
self._scaled_x = {}
# store list of shutter changes and callibrations
self._shutter_times = None
self._shutter_calibrations = {}
# TODO: Get this dynamically
device_list = ['PulseBlaster', 'NI_PCIe_6363', 'NI_PCI_6733']
# Load connection table
self.connection_table = ConnectionTable(path)
# open h5 file
with h5py.File(path, 'r') as file:
# Get master pseudoclock
self.master_pseudoclock_name = file['connection table'].attrs['master_pseudoclock']
if isinstance(self.master_pseudoclock_name, bytes):
self.master_pseudoclock_name = self.master_pseudoclock_name.decode('utf8')
else:
self.master_pseudoclock_name = str(self.master_pseudoclock_name)
# get stop time
self.stop_time = file['devices'][self.master_pseudoclock_name].attrs['stop_time']
self.device_names = list(file['devices'].keys())
# Get Shutter Calibrations
if 'calibrations' in file and 'Shutter' in file['calibrations']:
for name, open_delay, close_delay in numpy.array(file['calibrations']['Shutter']):
name = name.decode('utf8') if isinstance(name, bytes) else str(name)
self._shutter_calibrations[name] = [open_delay, close_delay]
[docs]
def delete_cache(self):
self._channels = None
self._traces = None
def _load(self):
if self._channels is None:
self._channels = {}
if self._traces is None:
self._traces = {}
if self._markers is None:
self._markers = {}
if self._shutter_times is None:
self._shutter_times = {}
self._load_markers()
# Let's walk the connection table, starting with the master pseudoclock
master_pseudoclock_device = self.connection_table.find_by_name(self.master_pseudoclock_name)
self._load_device(master_pseudoclock_device)
# self._scalehandler = ScaleHandler(self._markers.keys(), self.stop_time)
def _load_markers(self):
with h5py.File(self.path, 'r') as file:
if "time_markers" in file:
for row in file["time_markers"]:
self._markers[row['time']] = {'color': row['color'].tolist()[0], 'label': row['label']}
elif "runviewer" in file:
for time, val in file["runviewer"]["markers"].attrs.items():
props = val.strip('{}}').rsplit(",", 1)
color = list(map(int, props[0].split(":")[1].strip(" ()").split(",")))
label = props[1].split(":")[1]
self._markers[float(time)] = {'color': color, 'label': label}
if 0 not in self._markers:
self._markers[0] = {'color': [0,0,0], 'label': 'Start'}
if self.stop_time not in self._markers:
self._markers[self.stop_time] = {'color': [0,0,0], 'label' : 'End'}
[docs]
def add_trace(self, name, trace, parent_device_name, connection):
name = str(name)
self._channels[name] = {'device_name': parent_device_name, 'port': connection}
self._traces[name] = trace
# add shutter times
con = self.connection_table.find_by_name(name)
if con.device_class == "Shutter" and 'open_state' in con.properties:
self.add_shutter_times([(name, con.properties['open_state'])])
# Temporary solution to physical shutter times
[docs]
def add_shutter_times(self, shutters):
for name, open_state in shutters:
x_values, y_values = self._traces[name]
if len(x_values) > 0:
change_indices = numpy.where(y_values[:-1] != y_values[1:])[0]
change_indices += 1 # use the index of the value that is changed to
change_values = list(zip(x_values[change_indices], y_values[change_indices]))
change_values.insert(0, (x_values[0], y_values[0])) # insert first value
self._shutter_times[name] = {x_value + (self._shutter_calibrations[name][0] if y_value == open_state else self._shutter_calibrations[name][1]): 1 if y_value == open_state else 0 for x_value, y_value in change_values}
def _load_device(self, device, clock=None):
try:
logger.info('loading %s' % device.name)
module = device.device_class
# Load the master pseudoclock class
device_class = device_registry.get_runviewer_parser(module)
device_instance = device_class(self.path, device)
clocklines_and_triggers = device_instance.get_traces(self.add_trace, clock)
for name, trace in clocklines_and_triggers.items():
child_device = self.connection_table.find_by_name(name)
for grandchild_device_name, grandchild_device in child_device.child_list.items():
self._load_device(grandchild_device, trace)
except Exception as e:
if hasattr(device, 'name'):
logger.info(f'Failed to load device {device.name:s}, exception was:\n'+
''.join(traceback.format_exception(type(e), e, e.__traceback__)))
else:
logger.info('Failed to load device (unknown name, device object does not have attribute name), exception was:\n'+
''.join(traceback.format_exception(type(e), e, e.__traceback__)))
# backwards compat
with h5py.File(self.path, 'r') as file:
if "runviewer" in file:
if "shutter_times" in file["runviewer"]:
for name, val in file["runviewer"]["shutter_times"].attrs.items():
self._shutter_times[name] = {float(key_value.split(":")[0]): int(key_value.split(":")[1]) for key_value in val.strip('{}}').split(",")}
[docs]
def scaled_times(self, channel):
if self.cached_scaler != app.scalehandler:
self.cached_scaler = app.scalehandler
self._scaled_x = {}
if channel not in self._scaled_x:
self._scaled_x[channel] = self.cached_scaler.get_scaled_time(self._traces[channel][0])
return self._scaled_x[channel]
@property
def channels(self):
if self._channels is None:
self._load()
return self._channels.keys()
[docs]
def clear_cache(self):
# clear cache variables to cut down on memory usage
pass
@property
def markers(self):
if self._markers is None:
self._load()
return self._markers
@property
def traces(self):
# if traces cached:
# return cached traces and waits
if self._traces is None:
self._load()
return self._traces
@property
def shutter_times(self):
if self._shutter_times is None:
self._load()
return self._shutter_times
# @property
# def scalehandler(self):
# if self._scalehandler is None:
# self._load()
# return self._scalehandler
[docs]
class TempShot(Shot):
def __init__(self, i):
Shot.__init__(self, 'shot %d' % i)
self._channels = ['Bx', 'By', 'Bz', 'Bq']
self.stop_time = i + 1
self.traces = {}
no_x_points = 10000
for channel in self.channels:
# self.traces[channel] = (numpy.linspace(0,10,no_x_points), numpy.random.rand(no_x_points))
x_points = numpy.linspace(0, self.stop_time, no_x_points)
self.traces[channel] = (x_points, (i + 1) * numpy.sin(x_points * numpy.pi + i / 11.0 * 2 * numpy.pi))
@property
def channels(self):
return self._channels
[docs]
def get_traces(self):
return self.traces
[docs]
class RunviewerServer(ZMQServer):
def __init__(self, *args, **kwargs):
ZMQServer.__init__(self, *args, **kwargs)
self.logger = logging.getLogger('runviewer.server')
[docs]
def handler(self, h5_filepath):
if h5_filepath == 'hello':
return 'hello'
self.logger.info('Received hdf5 file: %s' % h5_filepath)
# Convert path to local slashes and shared drive prefix:
h5_filepath = labscript_utils.shared_drive.path_to_local(h5_filepath)
logger.info('local filepath: %s' % h5_filepath)
# we add the shot to a queue so that we don't have to wait for the app to come up before
# responding to runmanager
shots_to_process_queue.put(h5_filepath)
return 'ok'
if __name__ == "__main__":
qapplication = QApplication.instance()
if qapplication is None:
qapplication = QApplication(sys.argv)
shots_to_process_queue = Queue()
exp_config = LabConfig(required_params = {"DEFAULT": ["apparatus_name"], "paths": ["shared_drive", "experiment_shot_storage"], 'ports': ['runviewer']})
port = int(exp_config.get('ports', 'runviewer'))
# Start experiment server
experiment_server = RunviewerServer(port)
app = RunViewer(exp_config)
splash.hide()
def execute_program():
qapplication.exec_()
# Let the interpreter run every 500ms so it sees Ctrl-C interrupts:
timer = QTimer()
timer.start(500)
timer.timeout.connect(lambda: None)
# Upon seeing a ctrl-c interrupt, quit the event loop
signal.signal(signal.SIGINT, lambda *args: qapplication.exit())
sys.exit(execute_program())