Source code for blacs.analysis_submission

#####################################################################
#                                                                   #
# /analysis_submission.py                                           #
#                                                                   #
# Copyright 2013, Monash University                                 #
#                                                                   #
# This file is part of the program BLACS, in the labscript suite    #
# (see http://labscriptsuite.org), and is licensed under the        #
# Simplified BSD License. See the license.txt file in the root of   #
# the project for the full license.                                 #
#                                                                   #
#####################################################################
import logging
import os
import threading
import time
import sys
import queue

from qtutils.qt.QtCore import *
from qtutils.qt.QtGui import *
from qtutils.qt.QtWidgets import *

from qtutils import *
from zprocess import TimeoutError, raise_exception_in_thread
from zprocess.security import AuthenticationFailure
from labscript_utils.ls_zprocess import zmq_get
from socket import gaierror
import labscript_utils.shared_drive
from labscript_utils.qtwidgets.elide_label import elide_label
from blacs import BLACS_DIR


[docs] class AnalysisSubmission(object):
[docs] def __init__(self, BLACS, blacs_ui): self.inqueue = queue.Queue() self.BLACS = BLACS self.port = int(self.BLACS.exp_config.get('ports', 'lyse')) self._ui = UiLoader().load(os.path.join(BLACS_DIR, 'analysis_submission.ui')) blacs_ui.analysis.addWidget(self._ui) self._ui.frame.setMinimumWidth(blacs_ui.queue_controls_frame.sizeHint().width()) elide_label(self._ui.resend_shots_label, self._ui.failed_to_send_frame.layout(), Qt.ElideRight) # connect signals self._ui.send_to_server.toggled.connect(lambda state: self._set_send_to_server(state)) self._ui.server.editingFinished.connect(lambda: self._set_server(self._ui.server.text())) self._ui.clear_unsent_shots_button.clicked.connect(lambda _: self.clear_waiting_files()) self._ui.retry_button.clicked.connect(lambda _: self.check_retry()) self._waiting_for_submission = [] self.failure_reason = None self.server_online = 'offline' self.send_to_server = False self.server = '' self.time_of_last_connectivity_check = 0 self.mainloop_thread = threading.Thread(target=self.mainloop) self.mainloop_thread.daemon = True self.mainloop_thread.start()
[docs] def restore_save_data(self,data): if "server" in data: self.server = data["server"] if "send_to_server" in data: self.send_to_server = data["send_to_server"] if "waiting_for_submission" in data: self._waiting_for_submission = list(data["waiting_for_submission"]) self.inqueue.put(['save data restored', None]) self.check_retry()
[docs] def get_save_data(self): return {"waiting_for_submission":list(self._waiting_for_submission), "server":self.server, "send_to_server":self.send_to_server }
def _set_send_to_server(self,value): self.send_to_server = value def _set_server(self,server): self.server = server self.check_retry() @property @inmain_decorator(True) def send_to_server(self): return self._send_to_server @send_to_server.setter @inmain_decorator(True) def send_to_server(self, value): self._send_to_server = bool(value) self._ui.send_to_server.setChecked(self.send_to_server) if self.send_to_server: self._ui.server.setEnabled(True) self._ui.server_online.show() self.check_retry() else: self.clear_waiting_files() self._ui.server.setEnabled(False) self._ui.server_online.hide() @property @inmain_decorator(True) def server(self): return str(self._server) @server.setter @inmain_decorator(True) def server(self,value): self._server = value self._ui.server.setText(self.server) @property @inmain_decorator(True) def server_online(self): return self._server_online @server_online.setter @inmain_decorator(True) def server_online(self,value): self._server_online = str(value) icon_names = {'checking': ':/qtutils/fugue/hourglass', 'online': ':/qtutils/fugue/tick', 'offline': ':/qtutils/fugue/exclamation', '': ':/qtutils/fugue/status-offline'} tooltips = {'checking': 'Checking...', 'online': 'Server is responding', 'offline': 'Server not responding', '': 'Disabled'} icon = QIcon(icon_names.get(self._server_online, ':/qtutils/fugue/exclamation-red')) pixmap = icon.pixmap(QSize(16, 16)) tooltip = tooltips.get(self._server_online, "Invalid server status: %s" % self._server_online) if self.failure_reason is not None: tooltip += '\n' + self.failure_reason # Update GUI: self._ui.server_online.setPixmap(pixmap) self._ui.server_online.setToolTip(tooltip) self.update_waiting_files_message()
[docs] @inmain_decorator(True) def update_waiting_files_message(self): # if there is only one shot and we haven't encountered failure yet, do # not show the error frame: if (self.server_online == 'checking') and (len(self._waiting_for_submission) == 1) and not self._ui.failed_to_send_frame.isVisible(): return if self._waiting_for_submission: self._ui.failed_to_send_frame.show() if self.server_online == 'checking': self._ui.retry_button.hide() text = 'Sending %s shot(s)...' % len(self._waiting_for_submission) else: self._ui.retry_button.show() text = '%s shot(s) to send' % len(self._waiting_for_submission) self._ui.resend_shots_label.setText(text) else: self._ui.failed_to_send_frame.hide()
[docs] def get_queue(self): return self.inqueue
[docs] @inmain_decorator(True) def clear_waiting_files(self): self._waiting_for_submission = [] self.update_waiting_files_message()
[docs] @inmain_decorator(True) def check_retry(self): self.inqueue.put(['check/retry', None])
[docs] def mainloop(self): self._mainloop_logger = logging.getLogger('BLACS.AnalysisSubmission.mainloop') # Ignore signals until save data is restored: while self.inqueue.get()[0] != 'save data restored': pass timeout = 10 while True: try: try: signal, data = self.inqueue.get(timeout=timeout) except queue.Empty: timeout = 10 # Periodic checking of connectivity and resending of files. # Don't trigger a re-check if we already failed a connectivity # check within the last second: if (time.time() - self.time_of_last_connectivity_check) > 1: signal = 'check/retry' else: continue if signal == 'check/retry': self.check_connectivity() if self.server_online == 'online': self.submit_waiting_files() elif signal == 'file': if self.send_to_server: self._waiting_for_submission.append(data) if self.server_online != 'online': # Don't stack connectivity checks if many files are # arriving. If we failed a connectivity check less # than a second ago then don't check again. if (time.time() - self.time_of_last_connectivity_check) > 1: self.check_connectivity() else: # But do queue up a check for when we have # been idle for one second: timeout = 1 if self.server_online == 'online': self.submit_waiting_files() elif signal == 'close': break elif signal == 'save data restored': continue else: raise ValueError('Invalid signal: %s'%str(signal)) self._mainloop_logger.info('Processed signal: %s'%str(signal)) except Exception: # Raise in a thread for visibility, but keep going raise_exception_in_thread(sys.exc_info()) self._mainloop_logger.exception("Exception in mainloop, continuing")
[docs] def check_connectivity(self): host = self.server send_to_server = self.send_to_server if host and send_to_server: self.server_online = 'checking' try: response = zmq_get(self.port, host, 'hello', timeout=1) self.failure_reason = None except (TimeoutError, gaierror, AuthenticationFailure) as e: success = False self.failure_reason = str(e) else: success = (response == 'hello') if not success: self.failure_reason = "unexpected reponse: %s" % str(response) # update GUI self.server_online = 'online' if success else 'offline' else: self.server_online = '' self.time_of_last_connectivity_check = time.time()
[docs] def submit_waiting_files(self): success = True while self._waiting_for_submission and success: path = self._waiting_for_submission[0] self._mainloop_logger.info('Submitting run file %s.\n'%os.path.basename(path)) data = {'filepath': labscript_utils.shared_drive.path_to_agnostic(path)} self.server_online = 'checking' try: response = zmq_get(self.port, self.server, data, timeout=1) self.failure_reason = None except (TimeoutError, gaierror, AuthenticationFailure) as e: success = False self.failure_reason = str(e) else: success = (response == 'added successfully') if not success: self.failure_reason = "unexpected reponse: %s" % str(response) try: self._waiting_for_submission.pop(0) except IndexError: # Queue has been cleared pass if not success: break # update GUI self.server_online = 'online' if success else 'offline' self.time_of_last_connectivity_check = time.time()