Source code for lyse.communication

#####################################################################
#                                                                   #
# /communication.py                                                 #
#                                                                   #
# Copyright 2013, Monash University                                 #
#                                                                   #
# This file is part of the program lyse, in the labscript suite     #
# (see http://labscriptsuite.org), and is licensed under the        #
# Simplified BSD License. See the license.txt file in the root of   #
# the project for the full license.                                 #
#                                                                   #
#####################################################################
"""Code required for interprocess communication
"""

# 3rd party imports:
import numpy as np

# Labscript imports
from labscript_utils.ls_zprocess import ZMQServer
import labscript_utils.shared_drive as shared_drive

# qt imports
from qtutils import inmain_decorator

# Lyse imports
from lyse.dataframe_utilities import rangeindex_to_multiindex

[docs] class WebServer(ZMQServer):
[docs] def __init__(self, app, *args, **kwargs): self.app = app super().__init__(*args, **kwargs)
[docs] def handler(self, request_data): self.app.logger.info('WebServer request: %s' % str(request_data)) if request_data == 'hello': return 'hello' elif isinstance(request_data, tuple) and request_data[0]=='get dataframe' and len(request_data)==3: _, n_sequences, filter_kwargs = request_data df = self._retrieve_dataframe() df = rangeindex_to_multiindex(df, inplace=True) # Return only a subset of the dataframe if instructed to do so. if n_sequences is not None: df = self._extract_n_sequences_from_df(df, n_sequences) if filter_kwargs is not None: df = df.filter(**filter_kwargs) return df elif request_data == 'get dataframe': # Ensure backwards compatability with clients using outdated # versions of lyse. return self._retrieve_dataframe() elif isinstance(request_data, dict): if 'filepath' in request_data: h5_filepath = shared_drive.path_to_local(request_data['filepath']) if isinstance(h5_filepath, bytes): h5_filepath = h5_filepath.decode('utf8') if not isinstance(h5_filepath, str): raise AssertionError(str(type(h5_filepath)) + ' is not str or bytes') self.app.filebox.incoming_queue.put(h5_filepath) return 'added successfully' elif isinstance(request_data, str): # Just assume it's a filepath: self.app.filebox.incoming_queue.put(shared_drive.path_to_local(request_data)) return "Experiment added successfully\n" return ("error: operation not supported. Recognised requests are:\n " "'get dataframe'\n 'hello'\n {'filepath': <some_h5_filepath>}")
@inmain_decorator(wait_for_return=True) def _copy_dataframe(self): df = self.app.filebox.shots_model.dataframe.copy(deep=True) return df def _retrieve_dataframe(self): # infer_objects() picks fixed datatypes for columns that are compatible with # fixed datatypes, dramatically speeding up pickling. It is called here # rather than when updating the dataframe as calling it during updating may # call it needlessly often, whereas it only needs to be called prior to # sending the dataframe to a client requesting it, as we're doing now. df = self._copy_dataframe() df.infer_objects() return df def _extract_n_sequences_from_df(self, df, n_sequences): # If the dataframe is empty, just return it, otherwise accessing columns # below will raise a KeyError. if df.empty: return df # Get a list of all unique sequences, each corresponding to one call to # engage in runmanager. Each sequence may contain multiple runs. The # below creates strings to identify sequences. To be from the same # sequence, two shots have to have the same value for 'sequence' (which # makes sure that the time when engage was called are the same to within # 1 second), 'labscript' (must have been generated from the same # labscript), and 'sequence_index' (a counter which keeps track of how # many times engage has been called and resets to 0 at the start of each # day). Typically just the value for sequence, is enough. However it # only records time down to the second, so if engage() is called twice # quickly then two different sequences can end up with the same value # there. sequences = [str(sequence) for sequence in df['sequence']] labscripts = [str(labscript) for labscript in df['labscript']] sequence_indices = [str(index) for index in df['sequence_index']] # Combine into one string. criteria = zip(sequences, labscripts, sequence_indices) indentity_strings = [seq + script + ind for seq, script, ind in criteria] # Find the distinct values, maintaining their ordering. unique_identities = np.intersect1d(indentity_strings, indentity_strings) # Slice the DataFrame so that only the last n_sequences sequences # remain. Note that slicing unique_identities just returns all of its # entries if n_sequences is greater than its length; it doesn't raise an # error. if n_sequences == 0: identities_included = [] else: identities_included = unique_identities[-n_sequences:] df_subset = df[[id in identities_included for id in indentity_strings]] return df_subset