from __future__ import division, unicode_literals, print_function, absolute_import
import sys
PY2 = sys.version_info.major == 2
import os
import threading
import time
import signal
import weakref
import json
import ctypes
import atexit
from ctypes.util import find_library
import logging
from binascii import hexlify
import zmq
_path, _cwd = os.path.split(os.getcwd())
if _cwd == 'zprocess' and _path not in sys.path:
# Running from within zprocess dir? Add to sys.path for testing during
# development:
sys.path.insert(0, _path)
import ipaddress
import zprocess
from zprocess.security import SecureContext
from zprocess.utils import (
gethostbyname,
Interruptor,
Interrupted,
TimeoutError,
get_venv_executable_and_env,
_get_fileno
)
from zprocess.remote import (
RemoteProcessClient,
RemoteOutputReceiver,
DEFAULT_PORT as REMOTE_DEFAULT_PORT,
)
from zprocess.locking import ZLockClient, DEFAULT_PORT as ZLOCK_DEFAULT_PORT
from zprocess.zlog import ZLogClient, DEFAULT_PORT as ZLOG_DEFAULT_PORT
PROCESS_CLASS_WRAPPER = 'zprocess.process_class_wrapper'
PY2 = sys.version_info[0] == 2
if PY2:
import cPickle as pickle
from Queue import Queue, Empty
import subprocess32 as subprocess
from time import time as monotonic
str = unicode
from thread import get_ident as thread_ident
else:
import pickle
from queue import Queue, Empty
import subprocess
from time import monotonic
from threading import get_ident as thread_ident
class KillLock(object):
"""A lock which, when held, prevents the process from being terminated. If SIGTERM
is received while held, the process will terminate only once the lock is released.
Any number of threads may hold the lock simultaneously, any number of times. The lock
will be released and SIGTERM handled only when release() has been called once for
every time acquire() was called. This object is a singleton, instantiating it will
return the existing instance, if any. The first time it is instantiated, connect()
will be called. When SIGTERM is handled by this object's handler, it will call any
SIGTERM handler that was already configured when connect() was called. Therefore if
you wish to configure your own SIGTERM handler, either ensure it is configured
before the kill lock singleton is created (keeping in mind that zprocess creates the
singleton when connecting to parent processes), or call disconnect(), then connect
your signal handler, then call connect() again. """
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = self = object.__new__(cls)
self._n_acquires = 0
self._prev_handler = None
self._sigterm_received = False
self._lock = threading.RLock()
self.connected = False
self.connect()
return cls._instance
def connect(self):
with self._lock:
if self.connected:
raise RuntimeError("Already connected")
self._prev_handler = signal.signal(signal.SIGTERM, self._handler)
self.connected = True
def _check_connected(self):
with self._lock:
if not self.connected:
raise RuntimeError("Not connected")
if signal.getsignal(signal.SIGTERM) is not self._handler:
msg = """Another SIGTERM handler has been installed. To work properly
with KillLock, other SIGTERM handler must be installed whilst kill lock
is not connected. Either call KillLock.disconnect() before installing
other signal handlers and calling KillLock.connect() again, or install
other signal handlers before calling KillLock.connect() the first
time."""
raise RuntimeError(' '.join(msg.split()))
def disconnect(self):
with self._lock:
self._check_connected()
if self._n_acquires:
raise RuntimeError("Cannot disconnect() whilst kill lock held")
signal.signal(signal.SIGTERM, self._prev_handler)
self._prev_handler = None
self.connected = False
def acquire(self):
self._check_connected()
with self._lock:
self._n_acquires += 1
__enter__ = acquire
def _term(self):
self.disconnect()
os.kill(os.getpid(), signal.SIGTERM)
# Set things back to how they were in case the previous SIGTERM handler
# doesn't termiante the process
self.connect()
# staticmethod so that the method has a fixed id() for the identity check in
# _check_connected:
@staticmethod
def _handler(sig, frame):
self = KillLock._instance
with self._lock:
if self._sigterm_received and not self._n_acquires:
# We are being triggered to respond to SIGTERM upon release of the kill
# lock:
self._sigterm_received = False
self._term()
return
if self._n_acquires:
# Kill lock is held. Store a flag to respond to it when the kill lock is
# released:
self._sigterm_received = True
msg = "SIGTERM ignored: will be handled when kill lock released."
if not zprocess._silent:
print(msg, file=sys.stderr)
else:
# Kill lock not held by anyone. Call previous SIGTERM handler.
self._term()
def release(self):
with self._lock:
if not self._n_acquires:
raise RuntimeError("Not held")
self._n_acquires -= 1
if self._sigterm_received and not self._n_acquires:
# If sigterm received while lock held, terminate:
os.kill(os.getpid(), signal.SIGTERM)
def __exit__(self, t, v, tb):
self.release()
class HeartbeatServer(object):
"""A server which receives messages from clients and echoes them back. Each
process has a HeartbeatServer to provide heartbeating to its subprocesses -
there is not only one in the top process.
"""
def __init__(self, bind_address='tcp://*',
shared_secret=None):
context = SecureContext.instance(shared_secret=shared_secret)
self.sock = context.socket(zmq.REP)
self.port = self.sock.bind_to_random_port(bind_address)
self.mainloop_thread = threading.Thread(target=self.mainloop)
self.mainloop_thread.daemon = True
self.mainloop_thread.start()
def mainloop(self):
try:
zmq.proxy(self.sock, self.sock)
except zmq.ContextTerminated:
# Shutting down:
self.sock.close(linger=0)
return
class HeartbeatClient(object):
# How long between heartbeats?
HEARTBEAT_INTERVAL = 1
# How long without a heartbeat until we kill the process? Wait longer when the
# heartbeat server is remote to be more forgiving of the network:
LOCALHOST_TIMEOUT = 1
REMOTE_TIMEOUT = 10
"""A heartbeating thread that terminates the process if it doesn't get the
heartbeats back within one second, unless a lock is held."""
def __init__(self, server_host, server_port,
shared_secret=None, allow_insecure=False):
self.lock = KillLock()
context = SecureContext.instance(shared_secret=shared_secret)
self.sock = context.socket(zmq.REQ, allow_insecure=allow_insecure)
self.sock.setsockopt(zmq.LINGER, 0)
server_ip = gethostbyname(server_host)
self.sock.connect('tcp://{}:{}'.format(server_ip, server_port))
self.mainloop_thread = threading.Thread(target=self.mainloop)
self.mainloop_thread.daemon = True
self.mainloop_thread.start()
if isinstance(server_ip, bytes):
server_ip = server_ip.decode()
if ipaddress.ip_address(server_ip).is_loopback:
self.timeout = self.LOCALHOST_TIMEOUT
else:
self.timeout = self.REMOTE_TIMEOUT
def mainloop(self):
try:
pid = str(os.getpid()).encode('utf8')
while True:
time.sleep(self.HEARTBEAT_INTERVAL)
self.sock.send(pid, zmq.NOBLOCK)
if not self.sock.poll(self.timeout * 1000):
break
msg = self.sock.recv()
if not msg == pid:
break
if not zprocess._silent:
print('Heartbeat failure', file=sys.stderr)
os.kill(os.getpid(), signal.SIGTERM)
except zmq.ContextTerminated:
# Shutting down:
self.sock.close(linger=0)
return
class EventBroker(object):
"""A broker to collect Event.post() messages from anywhere in the process tree
and broadcast them to subscribers calling event.wait(). There is only one of
these, at the top level process in the ProcessTree."""
# A message subscribers can use to confirm their connection
# (and any subscriptions) have been processed.
WELCOME_MESSAGE = b'_zprocess_broker_hello\0'
def __init__(self, bind_address='tcp://*', shared_secret=None,
allow_insecure=False):
context = SecureContext.instance(shared_secret=shared_secret)
self.frontend = context.socket(zmq.PULL, allow_insecure=allow_insecure)
self.backend = context.socket(zmq.XPUB, allow_insecure=allow_insecure)
self.poller = zmq.Poller()
self.poller.register(self.frontend, zmq.POLLIN)
self.poller.register(self.backend, zmq.POLLIN)
self.in_port = self.frontend.bind_to_random_port(bind_address)
self.out_port = self.backend.bind_to_random_port(bind_address)
self.mainloop_thread = threading.Thread(target=self.mainloop)
self.mainloop_thread.daemon = True
self.mainloop_thread.start()
def mainloop(self):
while True:
try:
events = dict(self.poller.poll())
if self.backend in events:
msg = self.backend.recv()
is_subscription, topic = ord(msg[0:1]), msg[1:]
if is_subscription and topic.startswith(self.WELCOME_MESSAGE):
# A new subscriber asking for a welcome message to confirm
# that we have received all subscriptions made prior to
# this request. Send the topic back (it includes a unique
# random number to ensure only the recipient gets it)
self.backend.send(topic)
if self.frontend in events:
# Forward messages to subscribers:
self.backend.send_multipart(self.frontend.recv_multipart())
except zmq.ContextTerminated:
# Shutting down:
self.frontend.close(linger=0)
self.backend.close(linger=0)
return
class ExternalBroker(object):
"""Container object for the details of an event broker that is external to
the ProcessTree that this process belongs to.
"""
def __init__(self, host, in_port, out_port):
self.host = host
self.in_port = in_port
self.out_port = out_port
class Event(object):
def __init__(self, process_tree, event_name, role='wait', external_broker=None):
self.event_name = event_name
# We null terminate the event name otherwise any subscriber subscribing to
# an event *starting* with our event name will also receive it, which
# we do not want:
self._encoded_event_name = self.event_name.encode('utf8') + b'\0'
self.role = role
if not role in ['wait', 'post', 'both']:
raise ValueError("role must be 'wait', 'post', or 'both'")
self.can_wait = self.role in ['wait', 'both']
self.can_post = self.role in ['post', 'both']
context = SecureContext.instance(shared_secret=process_tree.shared_secret)
if external_broker is not None:
broker_host = external_broker.host
broker_in_port = external_broker.in_port
broker_out_port = external_broker.out_port
else:
process_tree.check_broker()
broker_host = process_tree.broker_host
broker_in_port = process_tree.broker_in_port
broker_out_port = process_tree.broker_out_port
broker_ip = gethostbyname(broker_host)
if self.can_wait:
self.sub = context.socket(zmq.SUB,
allow_insecure=process_tree.allow_insecure)
self.sub.set_hwm(1000)
self.sub.setsockopt(zmq.SUBSCRIBE, self._encoded_event_name)
self.sub.connect('tcp://{}:{}'.format(broker_ip, broker_out_port))
# Request a welcome message from the broker confirming it receives this
# subscription request. This is important so that by the time this
# __init__ method returns, the caller can know for sure that if the
# broker receives a message, it will definitely be forwarded to the
# subscribers and not discarded. It is important that this come after
# connect() and after the other setsockopt for our event subscription,
# otherwise the two subscription requests may be sent in the opposite
# order, preventing us from relying on the receipt of a welcome message
# as confirmation that the other subscription was received. We use a
# unique random number to prevent treating *other* subscribers' welcome
# messages as our own. This is a lot of hoops to jump through when it
# would be really nice if zeroMQ could just have a way of saying "block
# until all subscription messages processed", which is all we're really
# doing.
unique_id = os.urandom(32)
self.sub.setsockopt(zmq.SUBSCRIBE,
EventBroker.WELCOME_MESSAGE + unique_id)
# Wait until we receive the welcome message. Throw out any messages
# prior to it. Timeout if we get nothing for 5 seconds:
while True:
events = self.sub.poll(flags=zmq.POLLIN, timeout=5000)
if not events:
raise TimeoutError("Could not connect to event broker")
if self.sub.recv() == EventBroker.WELCOME_MESSAGE + unique_id:
break
# Great, we're definitely connected to the broker now, and it has
# processed our subscription. Remove the welcome event subscription
# and proceed:
self.sub.setsockopt(zmq.UNSUBSCRIBE,
EventBroker.WELCOME_MESSAGE + unique_id)
self.sublock = threading.Lock()
if self.can_post:
self.push = context.socket(zmq.PUSH,
allow_insecure=process_tree.allow_insecure)
self.push.connect('tcp://{}:{}'.format(broker_ip, broker_in_port))
self.pushlock = threading.Lock()
def post(self, identifier, data=None):
if not self.can_post:
msg = ("Instantiate Event with role='post' " +
"or 'both' to be able to post events")
raise ValueError(msg)
with self.pushlock:
self.push.send_multipart([self._encoded_event_name,
str(identifier).encode('utf8'),
pickle.dumps(data,
protocol=zprocess.PICKLE_PROTOCOL)])
def wait(self, identifier, timeout=None):
identifier = str(identifier)
if not self.can_wait:
msg = ("Instantiate Event with role='wait' " +
"or 'both' to be able to wait for events")
raise ValueError(msg)
# First check through events that are already in the buffer:
while True:
with self.sublock:
events = self.sub.poll(0, flags=zmq.POLLIN)
if not events:
break
encoded_event_name, event_id, data = self.sub.recv_multipart()
event_id = event_id.decode('utf8')
data = pickle.loads(data)
assert encoded_event_name == self._encoded_event_name
if event_id == identifier:
return data
# Since we might have to make several recv() calls before we get the
# right identifier, we must implement our own timeout:
start_time = monotonic()
while timeout is None or (monotonic() < start_time + timeout):
with self.sublock:
if timeout is not None:
# How long left before the elapsed time is greater than
# timeout?
remaining = (start_time + timeout - monotonic())
poll_timeout = max(0, remaining)
events = self.sub.poll(1000 * poll_timeout, flags=zmq.POLLIN)
if not events:
break
encoded_event_name, event_id, data = self.sub.recv_multipart()
event_id = event_id.decode('utf8')
data = pickle.loads(data)
assert encoded_event_name == self._encoded_event_name
if event_id == identifier:
return data
raise TimeoutError('No event received: timed out')
class WriteQueue(object):
"""Provides writing of python objects to the underlying zmq socket, with added
locking and interruptibility of blocking sends. No reading is supported, once you
put an object, you can't check what was put or whether the items have been gotten"""
def __init__(self, sock):
self.sock = sock
self.lock = threading.Lock()
self.poller = zmq.Poller()
self.poller.register(self.sock)
self.interruptor = Interruptor()
def put(self, obj, timeout=None, interruptor=None):
"""Send an object to ourself, with optional timeout and optional
zprocess.Interruptor instance for interrupting from another thread"""
if timeout is not None:
deadline = monotonic() + timeout
if interruptor is None:
interruptor = self.interruptor
with self.lock:
try:
interruption_sock = interruptor.subscribe()
self.poller.register(interruption_sock)
while True:
if timeout is not None:
timeout = max(0, (deadline - monotonic()) * 1000)
events = dict(self.poller.poll(timeout))
if not events:
raise TimeoutError('put() timed out')
if interruption_sock in events:
raise Interrupted(interruption_sock.recv().decode('utf8'))
try:
return self.sock.send_pyobj(
obj, protocol=zprocess.PICKLE_PROTOCOL, flags=zmq.NOBLOCK
)
except zmq.ZMQError:
# Queue became full or we disconnected or something, keep
# polling:
continue
finally:
self.poller.unregister(interruption_sock)
interruptor.unsubscribe()
def interrupt(self, reason=None):
"""Interrupt any current and future put() calls, causing them to raise
Interrupted(reason) until clear_interrupt() is called. Note that if put() was
called with an externally created Interruptor object, then this method will not
interrupt that call, and Interruptor.set() will need to be called on the given
interruptor object instead."""
self.interruptor.set(reason=reason)
def clear_interrupt(self):
"""Clear our internal Interruptor object so that future put() calls can proceed
as normal."""
self.interruptor.clear()
class ReadQueue(object):
"""Provides reading and writing methods to the underlying zmq socket(s), with added
locking and interruptibility of blocking recv()s. Items put() to the queue are
readable by get(), as are items sent from the connected peer. This can be useful to
send commands to a loop from within the same application. The original use case was
for breaking out of blocking recv()s, though that use case is now better served by
interrupt()"""
def __init__(self, sock):
self.sock = sock
self.to_self = sock.context.socket(zmq.PUSH)
self.from_self = sock.context.socket(zmq.PULL)
self_endpoint = 'inproc://zpself' + hexlify(os.urandom(8)).decode()
self.from_self.bind(self_endpoint)
self.to_self.connect(self_endpoint)
self.lock = threading.Lock()
self.to_self_lock = threading.Lock()
self.in_poller = zmq.Poller()
self.in_poller.register(self.sock)
self.in_poller.register(self.from_self)
self.out_poller = zmq.Poller()
self.out_poller.register(self.to_self)
self.interruptor = Interruptor()
def get(self, timeout=None, interruptor=None):
"""Get an object sent from the child, with optional timeout and optional
zprocess.Interruptor instance for interrupting from another thread. If
interruptor is not provided, a default interruptor is used, and so get() can be
interrupted from another thread with ReadQueue.interruptor.set() (remember to
call .clear() to reset before calling get() again"""
if timeout is not None:
timeout *= 1000 # convert to ms
if interruptor is None:
interruptor = self.interruptor
with self.lock:
try:
interruption_sock = interruptor.subscribe()
self.in_poller.register(interruption_sock)
events = dict(self.in_poller.poll(timeout))
if not events:
raise TimeoutError('get() timed out')
if interruption_sock in events:
raise Interrupted(interruption_sock.recv().decode('utf8'))
if self.from_self in events:
sock = self.from_self
else:
assert self.sock in events
sock = self.sock
return sock.recv_pyobj()
finally:
self.in_poller.unregister(interruption_sock)
interruptor.unsubscribe()
def put(self, obj, timeout=None, interruptor=None):
"""Send an object to ourself, with optional timeout and optional
zprocess.Interruptor instance for interrupting from another thread"""
if timeout is not None:
deadline = monotonic() + timeout
if interruptor is None:
interruptor = self.interruptor
with self.to_self_lock:
try:
interruption_sock = interruptor.subscribe()
self.out_poller.register(interruption_sock)
while True:
if timeout is not None:
timeout = max(0, (deadline - monotonic()) * 1000)
events = dict(self.out_poller.poll(timeout))
if not events:
raise TimeoutError('put() timed out')
if interruption_sock in events:
raise Interrupted(interruption_sock.recv().decode('utf8'))
try:
return self.to_self.send_pyobj(
obj, protocol=zprocess.PICKLE_PROTOCOL, flags=zmq.NOBLOCK
)
except zmq.ZMQError:
# Queue became full or we disconnected or something, keep
# polling:
continue
finally:
self.out_poller.unregister(interruption_sock)
interruptor.unsubscribe()
def interrupt(self, reason=None):
"""Interrupt any current and future get()/put() calls, causing them to raise
Interrupted(reason) until clear_interrupt() is called. Note that if put()/get()
was called with an externally created Interruptor object, then this method will
not interrupt that call, and Interruptor.set() will need to be called on the
given interruptor object instead."""
self.interruptor.set(reason=reason)
def clear_interrupt(self):
"""Clear our internal Interruptor object so that future put()/get() calls can
proceed as normal."""
self.interruptor.clear()
class _StreamProxyBuffer(object):
def __init__(self, streamproxy):
self.streamproxy = streamproxy
def close(self):
return self.streamproxy.close()
def fileno(self):
return self.streamproxy.fileno()
def isatty(self):
return self.streamproxy.isatty()
def flush(self):
return self.streamproxy.flush()
def write(self, data):
"""Compatibility for code calling stdout.buffer.write etc"""
if not isinstance(data, bytes):
raise TypeError("Can only write bytes to buffer")
return self.streamproxy.write(data)
class StreamProxy(object):
# Declare that our write() method accepts a 'charformat' kwarg for specifying
# formatting
supports_rich_write = True
def __init__(self, fd, sock, socklock, streamname):
self.fd = fd
self.sock = sock
self.socklock = socklock
self.streamname_bytes = streamname.encode('utf8')
# Hopefully this covers all our bases for ensuring the mainloop gets
# a chance to run after C output before Python output is produced:
if os.name == 'posix':
libpthread = ctypes.CDLL(find_library('pthread'))
self.sched_yield = libpthread.sched_yield
else:
self.sched_yield = lambda: time.sleep(0)
self.buffer = _StreamProxyBuffer(self)
def write(self, s, charformat=None):
if isinstance(s, str):
if PY2:
s = s.encode('utf8')
else:
# Allow binary data embedded in the unicode string via surrogate escapes
# to turn back into the bytes it originally represented:
s = s.encode('utf8', errors='surrogateescape')
if charformat is None:
charformat = self.streamname_bytes
elif isinstance(charformat, str):
charformat = charformat.encode('utf8')
# Release the GIL momentarily to increase the odds that previous output
# from C or a subprocess will have been processed by
# OutputInterceptor._mainloop(), preserving the order of output. This is no
# guarantee, but is the best we can do if we want to be able to capture
# output from C and subprocesses whilst distinguishing between stdout and
# stderr without doing LD_PRELOAD tricks. Sleeping zero or calling
# sched_yield releases the GIL momentarily, giving other threads a chance
# to run. The mainloop is higher priority on Windows, so it will get the
# GIL until it is done or until the next checkinterval(). It is not higher
# priority on linux, but this is the best we can do.
self.sched_yield()
with self.socklock:
# We write directly to the zmq socket for Python output to guarantee
# that sys.stdout and sys.stderr will come out in the correct order
# when coming from Python code, which we cannot similarly guarantee for
# C output. The downside of this is that Python stdout and C stdout
# output may be in the incorrect order, though we do our best to
# decrease the odds of this with thread yielding. This seemed like
# the better compromise than sending everything through the pipes and
# possibly reordering stdout and stderr for ordinary Python output.
self.sock.send_multipart([charformat, s])
# os.write(self.fd, s)
def close(self):
os.close(self.fd)
def fileno(self):
return self.fd
def isatty(self):
return os.isatty(self.fd)
def flush(self):
pass
class OutputInterceptor(object):
# Two OutputInterceptors talking to the same server must share a sock and
# corresponding lock:
socks_by_connection = weakref.WeakValueDictionary()
locks_by_connection = weakref.WeakValueDictionary()
# A lock to serialise calls to connect() and disconnect()
connect_disconnect_lock = threading.Lock()
# Only one stdout or stderr can be conencted at a time,
# so we keep track with this class attribute dict:
streams_connected = {'stdout': None, 'stderr': None}
# Ensure output is flushed at interpreter shutdown:
def _close_socks():
cls = OutputInterceptor
for connection in list(cls.socks_by_connection.keys()):
try:
sock = cls.socks_by_connection[connection]
lock = cls.locks_by_connection[connection]
except KeyError:
continue
with lock:
sock.close()
atexit.register(_close_socks)
"""Redirect stderr and stdout to a zmq PUSH socket"""
def __init__(self, host, port, streamname='stdout',
shared_secret=None, allow_insecure=False):
if streamname not in ['stdout', 'stderr']:
msg = "streamname must be 'stdout' or 'stderr'"
raise ValueError(msg)
self.streamname = streamname
self.stream_fd = None
self.backup_fd = None
self.read_pipe_fd = None
self.mainloop_thread = None
self.shutting_down = False
ip = gethostbyname(host)
connection_details = (ip, port, shared_secret, allow_insecure)
if connection_details not in self.socks_by_connection:
context = SecureContext.instance(shared_secret=shared_secret)
sock = context.socket(zmq.PUSH, allow_insecure=allow_insecure)
# At socket close, allow up to 1 second to send all unsent messages:
sock.setsockopt(zmq.LINGER, 1000)
sock.connect('tcp://%s:%d' % (ip, port))
socklock = threading.Lock()
self.socks_by_connection[connection_details] = sock
self.locks_by_connection[connection_details] = socklock
self.sock = self.socks_by_connection[connection_details]
self.socklock = self.locks_by_connection[connection_details]
if os.name == 'nt':
self._libc = ctypes.cdll.msvcrt
else:
self._libc = ctypes.CDLL(None)
self._c_stream_ptr = self._get_c_stream()
def _get_c_stream(self):
"""Get file pointer for C stream"""
if os.name == 'nt':
# Windows:
class FILE(ctypes.Structure):
_fields_ = [
("_ptr", ctypes.c_char_p),
("_cnt", ctypes.c_int),
("_base", ctypes.c_char_p),
("_flag", ctypes.c_int),
("_file", ctypes.c_int),
("_charbuf", ctypes.c_int),
("_bufsize", ctypes.c_int),
("_tmpfname", ctypes.c_char_p),
]
iob_func = getattr(self._libc, '__iob_func')
iob_func.restype = ctypes.POINTER(FILE)
iob_func.argtypes = []
array = iob_func()
if self.streamname == 'stdout':
return ctypes.addressof(array[1])
else:
return ctypes.addressof(array[2])
else:
try:
# Linux:
return ctypes.c_void_p.in_dll(self._libc, self.streamname)
except ValueError:
# MacOS:
return ctypes.c_void_p.in_dll(self._libc, '__%sp' % self.streamname)
def _flush(self):
"""Flush the C level file pointer for the stream. This should be
done before closing their file descriptors"""
if os.name == 'nt':
# In windows we flush all output streams by calling flush on a null
# pointer:
file_ptr = ctypes.c_void_p()
else:
file_ptr = self._c_stream_ptr
self._libc.fflush(file_ptr)
def _unbuffer(self):
"""Set C output streams to unbuffered"""
if os.name == 'nt':
_IONBF = 4
else:
_IONBF = 2
self._libc.setvbuf.argtypes = [
ctypes.c_void_p,
ctypes.c_char_p,
ctypes.c_int,
ctypes.c_size_t,
]
self._libc.setvbuf(self._c_stream_ptr, None, _IONBF, 0)
def connect(self):
"""Begin output redirection"""
with self.connect_disconnect_lock:
if self.streams_connected[self.streamname] is not None:
msg = ("An OutputInterceptor is already connected for stream " +
"'%s'" % self.streamname)
raise RuntimeError(msg)
self.streams_connected[self.streamname] = self
stream = getattr(sys, self.streamname)
if stream is not None and _get_fileno(stream) > 0:
self.stream_fd = stream.fileno()
else:
# On Windows with pythonw, sys.stdout and sys.stderr are None or
# have invalid (negative) file descriptors. We still want to
# redirect any C code or subprocesses writing to stdout or stderr
# so we use the standard file descriptor numbers, which, assuming
# no other tricks played by other code, will be 1 and 2:
if self.streamname == 'stdout':
self.stream_fd = 1
elif self.streamname == 'stderr':
self.stream_fd = 2
else:
raise ValueError(self.streamname)
# os.dup() lets us take a sort of backup of the current file
# descriptor for the stream, so that we can restore it later:
self.backup_fd = os.dup(self.stream_fd)
# We set up a pipe and set the write end of it to be the output file
# descriptor. C code and subprocesses will see this as the stream and
# write to it, and we will read from the read end of the pipe in a
# thread to pass their output to the zmq socket.
self.read_pipe_fd, write_pipe_fd = os.pipe()
self.mainloop_thread = threading.Thread(target=self._mainloop)
self.mainloop_thread.daemon = True
self.mainloop_thread.start()
# Before doing the redirection, flush the current streams:
self._flush()
self._unbuffer()
# Redirect the stream to our write pipe, closing the original stream
# file descriptor:
os.dup2(write_pipe_fd, self.stream_fd)
# Replace sys.<streamname> with a proxy object. Any Python code writing
# to the stream will have the output passed directly to the zmq socket,
# and any other code inspecting sys.stdout/sderr's fileno() will see
# the write end of our redirection pipe.
proxy = StreamProxy(write_pipe_fd,
self.sock, self.socklock, self.streamname)
setattr(sys, self.streamname, proxy)
def disconnect(self):
"""Stop output redirection"""
with self.connect_disconnect_lock:
if self.streams_connected[self.streamname] is not self:
msg = ("This OutputInterceptor not connected for stream " +
"'%s'" % self.streamname)
raise RuntimeError(msg)
self.streams_connected[self.streamname] = None
orig_stream = getattr(sys, '__%s__' % self.streamname)
self._flush()
os.dup2(self.backup_fd, self.stream_fd)
self.stream_fd = None
os.close(self.backup_fd)
self.backup_fd = None
proxy_stream = getattr(sys, self.streamname)
# self-pipe trick to break out of the blocking read and see the
# shutting_down flag. We do this rather than close the file descriptor
# for the write end of the pipe because other processes may still have
# an open file descriptor for the write end of the pipe, so the
# mainloop would not get a signal that the pipe was closed. Any
# remaining subprocesses will get a broken pipe error if they try to
# write output.
self.shutting_down = True
os.write(proxy_stream.fileno(), b'<dummy message>')
proxy_stream.close()
setattr(sys, self.streamname, orig_stream)
self.mainloop_thread.join()
self.mainloop_thread = None
self.shutting_down = False
def _mainloop(self):
streamname_bytes = self.streamname.encode('utf8')
# Set the priority of this thread a bit higher so that when C code or
# subprocesses write to stdout, and multiple threads are waiting on the
# GIL, this thread will get it and process the output before Python
# output is produced from other threads. This is only reliable if we
# can guarantee that the main thread releases and re-acquires the GIL
# before producing output, which we do in StreamProxy.write().
if os.name == 'nt':
w32 = ctypes.windll.kernel32
THREAD_SET_INFORMATION = 0x20
THREAD_PRIORITY_ABOVE_NORMAL = 1
handle = w32.OpenThread(THREAD_SET_INFORMATION, False,
threading.current_thread().ident)
result = w32.SetThreadPriority(handle, THREAD_PRIORITY_ABOVE_NORMAL)
w32.CloseHandle(handle)
if not result:
print('Failed to set priority of thread', w32.GetLastError())
else:
# In linux/mac, we cannot set a higher priority without root
# privileges. Best we can do it to call sched_yield from the other
# thread.
pass
while True:
s = os.read(self.read_pipe_fd, 4096)
with self.socklock:
if not s or self.shutting_down:
os.close(self.read_pipe_fd)
self.read_pipe_fd = None
break
self.sock.send_multipart([streamname_bytes, s])
class RichStreamHandler(logging.StreamHandler):
"""Logging hander that forwards the log level name as the 'charformat' keyword
argument, if it exists, to the write() method of the underlying stream object. If
connected to a qtutils.OutputBox, The OutputBox will format different log levels
depending on severity. This is designed both to work with an OutputBox as the
stream, or with a zprocess.StreamProxy. Thus zprocess subprocesses using a logger
with a RichStreamHandler set to sys.stdout or sys.stderr will have colourised log
output, as will any loggers in the same process as the OutputBox with the stream set
to the OutputBox."""
def emit(self, record):
if not getattr(self.stream, 'supports_rich_write', False):
return logging.StreamHandler.emit(self, record)
try:
msg = self.format(record) + '\n'
self.stream.write(msg, charformat=record.levelname)
except Exception:
self.handleError(record)
def rich_print(*values, **kwargs):
"""A print function allowing bold, italics, and colour, if stdout.write or
stderr.write supports a 'charformat' keyword argument and is connected to a
qtutils.OutputBox. This method accepts the same arguments as the Python print
function, as well as keyword args: 'color', a string containing either a named color
or hex value of a color; 'bold' and 'italic': booleans as to whether the text should
be bold or italic. If file=sys.stderr, the output will be red. Otherwise, if color
is not specified, output will be white. The 'color' and 'bold' keyword arguments if
provided will override the settings inferred from the file keyword argument. If the
stream does not support the 'charformat' keyword argument, then formatting will be
ignored."""
file = kwargs.get('file', sys.stdout)
if file is sys.stderr:
color = 'red'
bold = False
else:
color = 'white'
bold = False
bold = kwargs.pop('bold', bold)
color = kwargs.pop('color', color)
italic = kwargs.pop('italic', False)
if not getattr(file, 'supports_rich_write', False):
return print(*values, **kwargs)
sep = kwargs.pop('sep', ' ')
end = kwargs.pop('end', '\n')
charformat = repr((color, bold, italic)).encode('utf8')
file.write(sep.join(str(s) for s in values) + end, charformat=charformat)
class Process(object):
"""A class providing similar functionality to multiprocessing.Process, but
using zmq for communication and creating processes in a fresh environment
rather than by forking (or imitation forking as in Windows). Do not override
its methods other than run()."""
def __init__(
self,
process_tree,
output_redirection_host=None,
output_redirection_port=None,
remote_process_client=None,
subclass_fullname=None,
startup_timeout=5,
):
self._redirection_port = output_redirection_port
self._redirection_host = output_redirection_host
self.process_tree = process_tree
self.to_child = None
self.from_child = None
self.child = None
self.parent_host = None
self.to_parent = None
self.from_parent = None
self.kill_lock = None
self.remote_process_client = remote_process_client
self.subclass_fullname = subclass_fullname
self.startup_timeout = startup_timeout
self.startup_event = None
self.startup_interruptor = Interruptor()
self.startup_lock = threading.Lock()
if subclass_fullname is not None:
if self.__class__ is not Process:
msg = (
"Can only pass subclass_fullname to Process directly, "
+ "not to a subclass"
)
raise ValueError(msg)
if (
self.__module__ == '__main__'
and self.subclass_fullname is None
and self.remote_process_client is not None
):
msg = (
"Cannot start a remote process for a class defined in __main__. "
+ "The remote process will not be able to import the required class "
+ "as it will not know the import path. Either define the class in a "
+ "different module importable on both systems, or use "
+ "zprocess.Process directly, passing in subclass_fullname to specify "
+ "the full import path."
)
raise RuntimeError(msg)
def start(self, *args, **kwargs):
"""Call in the parent process to start a subprocess. Passes args and
kwargs to the run() method"""
# Allow Process.terminate() to be called at any time, either before or after
# start(), and catch it in a race-free way. Process.terminate() will acquire the
# startup lock and check for the existence of startup_event, and will only block
# on it if it exists. Otherwise it will call self.interrupt_startup(), and the
# below code will know not to start the child process.
with self.startup_lock:
if self.startup_interruptor.is_set:
raise Interrupted(self.startup_interruptor.reason)
else:
self.startup_event = threading.Event()
startup_queue = Queue()
try:
child_details = self.process_tree.subprocess(
PROCESS_CLASS_WRAPPER,
output_redirection_port=self._redirection_port,
output_redirection_host=self._redirection_host,
remote_process_client=self.remote_process_client,
startup_timeout=self.startup_timeout,
pymodule=True,
# This argument is not used by the child, but it makes it visible in
# process lists which process is which:
args=[self.subclass_fullname or self.__class__.__name__],
startup_queue=startup_queue,
startup_interruptor=self.startup_interruptor
)
self.to_child, self.from_child, self.child = child_details
finally:
with self.startup_lock:
# If the above was interrupted, but the child object existed before
# interruption, it will have been put to this queue. If so, set it as
# self.child
try:
self.child = startup_queue.get(block=False)
except Empty:
pass
# Inform waiting code that self.child should exist now, and that if it
# is None it means startup failed before it was created:
self.startup_event.set()
# Get the file that the class definition is in (not this file you're
# reading now, rather that of the subclass):
if self.subclass_fullname is None:
module_file = os.path.abspath(sys.modules[self.__module__].__file__)
basepath, extension = os.path.splitext(module_file)
if extension == '.pyc':
module_file = basepath + '.py'
if not os.path.exists(module_file):
# Nope? How about this extension then?
module_file = basepath + '.pyw'
if not os.path.exists(module_file):
# Still no? Well we can't really work out what the extension is then,
# can we?
msg = ("Can't find module file, what's going on, does " +
"it have an unusual extension?")
raise NotImplementedError(msg)
else:
module_file = None # Will be found with import machinery in the subprocess
if module_file is not None and self.remote_process_client is None:
# Send the module filepath to the child process so it can execute it in
# __main__, otherwise class definitions from the users __main__ module will
# not be unpickleable. Note that though executed in __main__, the code's
# __name__ will not be __main__, and so any main block won't execute, which
# is good! Also send sys.path the ensure the child's environment is the same
# as ours. Do not do this if remote, since the environment will not be
# meaningful on the other host.
self.to_child.put(
[self.__module__, module_file, sys.path],
timeout=self.startup_timeout,
interruptor=self.startup_interruptor,
)
else:
self.to_child.put(
[None, None, None],
timeout=self.startup_timeout,
interruptor=self.startup_interruptor,
)
# Send the class to the child, either as a pickled class or as the specified
# full path:
if self.subclass_fullname is not None:
self.to_child.put(
self.subclass_fullname,
timeout=self.startup_timeout,
interruptor=self.startup_interruptor,
)
else:
self.to_child.put(
self.__class__,
timeout=self.startup_timeout,
interruptor=self.startup_interruptor,
)
response = self.from_child.get(
timeout=self.startup_timeout, interruptor=self.startup_interruptor
)
if response != 'ok':
msg = "Error in child process importing specified Process subclass:\n\n%s"
raise Exception(msg % str(response))
self.to_child.put(
[args, kwargs],
timeout=self.startup_timeout,
interruptor=self.startup_interruptor,
)
return self.to_child, self.from_child
def _run(self):
"""Called in the child process to set up the connection with the
parent"""
self.to_parent = self.process_tree.to_parent
self.from_parent = self.process_tree.from_parent
self.parent_host = self.process_tree.parent_host
self.kill_lock = self.process_tree.kill_lock
args, kwargs = self.from_parent.get()
self.run(*args, **kwargs)
def interrupt_startup(self, reason='Process.interrupt_startup() called'):
"""Called from the parent process. Interrupt all blocking operations on starting
the child process, causing Process.start() to raise Interrupted(reason). After
interruption, self.child may be None if startup was interrupted before the child
was started, otherwise self.child will be the child Popen object, which could be
at any stage of setting up its connection with the parent. This method may be
called multiple times without raising an exception, it will simply do nothing
if startup has previously been interrupted"""
with self.startup_lock:
if not self.startup_interruptor.is_set:
self.startup_interruptor.set(reason=reason)
if self.startup_event is not None:
# start() has been called. Block until self.child exists:
self.startup_event.wait()
def terminate(self, wait_timeout=None, **kwargs):
"""Interrupt process startup if not already done, ensuring self.child exists or
is None if startup was interrupted before the process was created. Then if the
child is not None, call Popen.terminate() and Popen.wait() on it."""
self.interrupt_startup(reason='Process.terminate() called')
if self.child is not None:
try:
self.child.terminate(**kwargs)
self.child.wait(timeout=wait_timeout, **kwargs)
except (OSError, TimeoutError):
# process is already dead, or cannot contact remote server
pass
def run(self, *args, **kwargs):
"""The method that gets called in the subprocess. To be overridden by
subclasses"""
print("This is the run() method of the default zprocess Process class.")
print("Subclass Process and override this method with your own code "
"to be run in a subprocess")
import time
time.sleep(1)
class ProcessTree(object):
# __instance will be set to an an instance of ProcessTree after a subprocess sets up
# its connection with the parent, configured with the details inherited from the
# parent process. It will not be set in the top-level process. It can be accessed
# with ProcessTree.instance() and is name-mangled to be class-private.
__instance = None
[docs]
def __init__(
self,
shared_secret=None,
allow_insecure=False,
zlock_host=None,
zlock_port=ZLOCK_DEFAULT_PORT,
zlog_host=None,
zlog_port=ZLOG_DEFAULT_PORT,
):
self.shared_secret = shared_secret
self.allow_insecure = allow_insecure
self.zlock_host = zlock_host
self.zlock_port = zlock_port
self.zlog_host = zlog_host
self.zlog_port = zlog_port
self.parent_host = None
self.broker = None
self.broker_host = None
self.broker_in_port = None
self.broker_out_port = None
self.heartbeat_server = None
self.heartbeat_client = None
self.output_redirection_host = None
self.output_redirection_port = None
self.remote_output_receiver = None
self.to_parent = None
self.from_parent = None
self.kill_lock = None
self.log_paths = {}
self.startup_timeout = None
self.zlock_client = None
if self.zlock_host is not None:
self.zlock_client = ZLockClient(
self.zlock_host,
self.zlock_port,
shared_secret=self.shared_secret,
allow_insecure=self.allow_insecure,
)
self.zlog_client = None
if self.zlog_host is not None:
self.zlog_client = ZLogClient(
self.zlog_host,
self.zlog_port,
shared_secret=self.shared_secret,
allow_insecure=self.allow_insecure,
)
@classmethod
def instance(cls):
return ProcessTree.__instance
def check_broker(self):
if self.broker_in_port is None:
# We don't have a parent with a broker: it is our responsibility to
# make a broker:
self.broker = EventBroker(shared_secret=self.shared_secret,
allow_insecure=self.allow_insecure)
self.broker_host = 'localhost'
self.broker_in_port = self.broker.in_port
self.broker_out_port = self.broker.out_port
def event(self, event_name, role='wait', external_broker=None):
return Event(self, event_name, role=role, external_broker=external_broker)
def remote_process_client(self, host, port=REMOTE_DEFAULT_PORT):
"""Return a RemoteProcessClient configured with this ProcessTree's security
settings"""
return RemoteProcessClient(
host,
port=port,
shared_secret=self.shared_secret,
allow_insecure=self.allow_insecure,
)
def lock(self, key, read_only=False):
"""Return a zprocess.locking.Lock for a resource identified by the given key.
This lock is exclusive among all processes configured to use the same key and
same zlock server as this ProcessTree."""
if self.zlock_client is None:
msg = "ProcessTree not configured to connect to a zlock server"
raise RuntimeError(msg)
return self.zlock_client.lock(key, read_only=read_only)
def logging_handler(self, filepath, name=None):
"""Return a zprocess.zlog.ZMQLoggingHandler for the given filepath. All loggers
using the same file and zlog server as this ProcessTree will log to the same
file in a race-free way. If name is not None and matches the name passed in when
creating a logging handler in a parent process, the filepath that was used in
the parent process will be used instead of the one passed to this method. In
this way, if parent and child processes are running on different computers, the
filepath passed in by the parent will be used. This way one can prevent the zlog
server creating additional log files on the computer that the toplevel process
is running on, with paths that may be unrelated to other paths on that
computer."""
if name is not None:
filepath = self.log_paths.setdefault(name, filepath)
if self.zlog_host is None:
msg = "ProcessTree not configured to connect to a zlog server"
raise RuntimeError(msg)
return self.zlog_client.handler(filepath)
def subprocess(
self,
path,
output_redirection_host=None,
output_redirection_port=None,
remote_process_client=None,
startup_timeout=5,
pymodule=False,
args=None,
startup_queue=None,
startup_interruptor=None,
):
"""Start a subprocess and set up communication with it. Path can be either a
path to a Python script to be executed with 'python some_path.py, or a fully
qualified module path to be executed as 'python -m some.path'. Path will be
interpreted as the latter only if pymodule=True. If output_redirection_port is
not None, then all stdout and stderr of the child process will be sent on a
zmq.PUSH socket to the given port. If startup_queue is provided, it should be a
queue.Queue(). Once the child process exists, the child Popen object will be
put() to this queue. This allows Process.interrupt_startup() to obtain and set
Process.child to the child Popen object, if it exists at the time of
interruption, even if this function does not return due to raising the
Intertupted exception. This can be important as code calling
Process.interrupt_startup() (such as Process.terminate()) may wish to terminate
the child process. TODO finish this and other docstrings."""
context = SecureContext.instance(shared_secret=self.shared_secret)
to_child = context.socket(zmq.PUSH, allow_insecure=self.allow_insecure)
from_child = context.socket(zmq.PULL, allow_insecure=self.allow_insecure)
from_child_port = from_child.bind_to_random_port('tcp://*')
to_child_port = to_child.bind_to_random_port('tcp://*')
self.check_broker()
if self.heartbeat_server is None:
# First child process, we need a heartbeat server:
self.heartbeat_server = HeartbeatServer(shared_secret=self.shared_secret)
if self.zlock_client is not None:
zlock_process_name = self.zlock_client.process_name
else:
zlock_process_name = ''
if output_redirection_host is None:
if output_redirection_port is not None:
# Port provided but no host. Assume this host.
output_redirection_host = 'localhost'
elif self.output_redirection_port is not None:
# No redirection specified. Use existing redirection.
output_redirection_host = self.output_redirection_host
output_redirection_port = self.output_redirection_port
elif remote_process_client is not None:
# No existing redirection, and process is remote. Redirect its output to
# our own stdout using a RemoteOutputReceiver.
if self.remote_output_receiver is None:
self.remote_output_receiver = RemoteOutputReceiver(
shared_secret=self.shared_secret,
allow_insecure=self.allow_insecure,
)
output_redirection_host = 'localhost'
output_redirection_port = self.remote_output_receiver.port
else:
# No existing redirection, process is local. Do not do any redirection.
output_redirection_host = None
output_redirection_port = None
# Note. Any entries in the below dict containing a hostname or IP address must
# have a key ending in '_host', in order for the below code to translate them
# into externally valid IP addresses.
parentinfo = {
'parent_host': 'localhost',
'to_parent_port': from_child_port,
'from_parent_port': to_child_port,
'heartbeat_server_host': 'localhost',
'heartbeat_server_port': self.heartbeat_server.port,
'broker_host': self.broker_host,
'broker_in_port': self.broker_in_port,
'broker_out_port': self.broker_out_port,
'output_redirection_host': output_redirection_host,
'output_redirection_port': output_redirection_port,
'shared_secret': self.shared_secret,
'allow_insecure': self.allow_insecure,
'zlock_host': self.zlock_host,
'zlock_port': self.zlock_port,
'zlock_process_name': zlock_process_name,
'zlog_host': self.zlog_host,
'zlog_port': self.zlog_port,
'log_paths': self.log_paths,
'startup_timeout': startup_timeout,
}
if remote_process_client is not None:
# Translate any internal hostnames or IP addresses in parentinfo into our
# external IP address as seen from the remote process server:
external_ip = remote_process_client.get_external_IP(
get_interruptor=startup_interruptor
)
for key, value in list(parentinfo.items()):
if key.endswith('_host') and value is not None:
ip = gethostbyname(value)
if isinstance(ip, bytes):
ip = ip.decode()
if ipaddress.ip_address(ip).is_loopback:
parentinfo[key] = external_ip
if args is None:
args = []
# Build command line args:
if pymodule:
cmd = ['-m', path] + args
else:
cmd = [os.path.abspath(path)] + args
# Add environment variable for parent connection details:
extra_env = {'ZPROCESS_PARENTINFO': json.dumps(parentinfo)}
if PY2:
# Windows Python 2, only bytestrings allowed:
extra_env = {k.encode(): v.encode() for k, v in extra_env.items()}
if remote_process_client is None:
executable, env = get_venv_executable_and_env(os.environ.copy())
env.update(extra_env)
child = subprocess.Popen([executable] + cmd, env=env)
else:
# The remote server will prefix the path to its own Python interpreter.
# Also, it will pass to Popen an env consisting of its own env updated with
# this extra_env dict we are passing in.
child = remote_process_client.Popen(
cmd,
prepend_sys_executable=True,
extra_env=extra_env,
get_interruptor=startup_interruptor,
)
to_child = WriteQueue(to_child)
from_child = ReadQueue(from_child)
if startup_queue is not None:
startup_queue.put(child)
try:
msg = from_child.get(startup_timeout, interruptor=startup_interruptor)
except TimeoutError:
raise RuntimeError('child process did not connect within the timeout.')
assert msg == 'hello'
return to_child, from_child, child
def _connect_to_parent(self, parentinfo):
self.parent_host = gethostbyname(parentinfo['parent_host'])
if self.zlock_client is not None:
name = parentinfo['zlock_process_name']
# Append '-sub' to indicate we're a subprocess of the other process.
if not name.endswith('-sub'):
name += '-sub'
self.zlock_client.set_process_name(name)
context = SecureContext.instance(shared_secret=self.shared_secret)
to_parent = context.socket(zmq.PUSH, allow_insecure=self.allow_insecure)
from_parent = context.socket(zmq.PULL, allow_insecure=self.allow_insecure)
from_parent.connect(
'tcp://%s:%d' % (self.parent_host, parentinfo['from_parent_port'])
)
to_parent.connect(
"tcp://%s:%s" % (self.parent_host, parentinfo['to_parent_port'])
)
self.startup_timeout = parentinfo.get('startup_timeout', None)
self.from_parent = ReadQueue(from_parent)
self.to_parent = WriteQueue(to_parent)
self.to_parent.put('hello', timeout=self.startup_timeout)
self.output_redirection_host = parentinfo.get('output_redirection_host', None)
self.output_redirection_port = parentinfo.get('output_redirection_port', None)
if self.output_redirection_port is not None:
stdout = OutputInterceptor(self.output_redirection_host,
self.output_redirection_port,
shared_secret=self.shared_secret,
allow_insecure=self.allow_insecure)
stderr = OutputInterceptor(self.output_redirection_host,
self.output_redirection_port,
streamname='stderr',
shared_secret=self.shared_secret,
allow_insecure=self.allow_insecure)
stdout.connect()
stderr.connect()
heartbeat_server_host = parentinfo['heartbeat_server_host']
heartbeat_server_port = parentinfo['heartbeat_server_port']
self.heartbeat_client = HeartbeatClient(
heartbeat_server_host,
heartbeat_server_port,
shared_secret=self.shared_secret,
allow_insecure=self.allow_insecure,
)
self.broker_host = parentinfo['broker_host']
self.broker_in_port = parentinfo['broker_in_port']
self.broker_out_port = parentinfo['broker_out_port']
self.kill_lock = self.heartbeat_client.lock
self.log_paths = parentinfo['log_paths']
@classmethod
def connect_to_parent(cls):
if ProcessTree.__instance is not None:
msg = "Cannot connect_to_parent() twice"
raise ValueError(msg)
try:
parentinfo = json.loads(os.environ['ZPROCESS_PARENTINFO'])
# Ensure environment variable not inherited by child processes:
del os.environ['ZPROCESS_PARENTINFO']
except KeyError:
msg = "No ZPROCESS_PARENTINFO environment variable found"
raise RuntimeError(msg)
process_tree = cls(
shared_secret=parentinfo['shared_secret'],
allow_insecure=parentinfo['allow_insecure'],
zlock_host=parentinfo['zlock_host'],
zlock_port=parentinfo['zlock_port'],
zlog_host=parentinfo['zlog_host'],
zlog_port=parentinfo['zlog_port'],
)
process_tree._connect_to_parent(parentinfo)
ProcessTree.__instance = process_tree
return process_tree
# Backwards compatability follows:
# A default process tree for all out backward compat functions to work with:
_default_process_tree = ProcessTree(allow_insecure=True)
# Allow instantiating an Event without a ProcessTree as the first argument,
# insert a default ProcessTree:
_Event = Event
class Event(_Event):
def __init__(self, *args, **kwargs):
# Convert the keyword argument renaming:
if 'type' in kwargs:
kwargs['role'] = kwargs['type']
del kwargs['type']
if not args or not isinstance(args[0], ProcessTree):
args = (_default_process_tree,) + args
_Event.__init__(self, *args, **kwargs)
# Allow instantiating a Process() without a ProcessTree as the first argument,
# insert a default ProcessTree:
_Process = Process
class Process(_Process):
def __init__(self, *args, **kwargs):
# Backward compat for redirection port as sole positional arg:
if len(args) == 1 and not isinstance(args[0], ProcessTree) and not kwargs:
kwargs['output_redirection_port'] = args[0]
args = ()
if 'process_tree' not in kwargs and (
not args or not isinstance(args[0], ProcessTree)
):
args = (_default_process_tree,) + args
_Process.__init__(self, *args, **kwargs)
def _run(self):
# Set the process tree as the default for this process:
global _default_process_tree
_default_process_tree = self.process_tree
_Process._run(self)
# New way is to call ProcessTree.connect_to_parent(lock) and get back a
# ProcessTree. This is the old way, returning queues and (optionally) a lock
# instead:
def setup_connection_with_parent(lock=False):
process_tree = ProcessTree.connect_to_parent()
# Set as the default for this process:
global _default_process_tree
_default_process_tree = process_tree
if lock:
return (process_tree.to_parent,
process_tree.from_parent,
process_tree.kill_lock)
else:
return process_tree.to_parent, process_tree.from_parent
# New way is to instantiate a ProcessTree and call
# process_tree.subprocess(). Old way is:
def subprocess_with_queues(path, output_redirection_port=None):
if output_redirection_port == 0: # This used to mean no redirection
output_redirection_port = None
return _default_process_tree.subprocess(
path, output_redirection_port=output_redirection_port
)
__all__ = ['Process', 'ProcessTree', 'setup_connection_with_parent',
'subprocess_with_queues', 'Event']
# if __name__ == '__main__':
# def foo():
# remote_client = RemoteProcessClient('localhost', allow_insecure=True)
# to_child, from_child, child = _default_process_tree.subprocess(
# 'test_remote.py', remote_process_client=remote_client)
# foo()