Source code for riaps.deplo.relay

'''
Created on June 14, 2024

Based on code from pyzmq

@author: riaps

'''


import time
import sys
import os
import logging

import threading
import errno

import traceback
import time

from threading import Thread
from typing import Any, Callable, List, Optional, Tuple
import zmq

from zmq import ENOTSOCK, ETERM, PUSH, QUEUE, ZMQBindError, ZMQError, device

[docs]class Relay(threading.Thread): """A variant for the 0MQ ThreadProxySteerable. Ports: - in: DEALER (with identity = appName.actorName.PID). Connected to resource/fault/etc. monitors and depm control socket for this actor. - out: PAIR. Actor connects to it. - mon: PUB. depm monitor socket connects to it. - ctrl:PAIR. Used to stop relay thread. Flows: - in -> (out,mon) : - -> ctrl: manages (stops, etc.) relay thread - out -> You do not pass Socket instances to this, but rather Socket types:: Relay(in_socket_type, out_socket_type, mon_socket_type, ctrl_socket_type) For instance:: dev = Device(zmq.QUEUE, zmq.DEALER, zmq.ROUTER) Similar to zmq.device, but socket types instead of sockets themselves are passed, and the sockets are created in the work thread, to avoid issues with thread safety. As a result, additional bind_{in|out} and connect_{in|out} methods and setsockopt_{in|out} allow users to specify connections for the sockets. Parameters ---------- {in|out|mon|ctrl}_type : int zmq socket types, to be passed later to context.socket(). e.g. zmq.PUB, zmq.SUB, zmq.REQ. If out_type is < 0, then in_socket is used for both in_socket and out_socket. Methods ------- bind_{in|out|mon|ctrl}(iface) passthrough for ``{in|out|mon|ctrl}_socket.bind(iface)``, to be called in the thread connect_{in|out|mon|ctrl}(iface) passthrough for ``{in|out|mon|ctrl}_socket.connect(iface)``, to be called in the thread setsockopt_{in|out|mon|ctrl}(opt,value) passthrough for ``{in|out|mon|ctrl}_socket.setsockopt(opt, value)``, to be called in the thread Attributes ---------- daemon : bool sets whether the thread should be run as a daemon Default is true, because if it is false, the thread will not exit unless it is killed context_factory : callable (class attribute) Function for creating the Context. This will be Context.instance due to implementation via threads.. """ in_type: int out_type: int mon_type: int ctrl_type: int _in_binds: List[str] _in_connects: List[str] _in_sockopts: List[Tuple[int, Any]] _out_binds: List[str] _out_connects: List[str] _out_sockopts: List[Tuple[int, Any]] _mon_binds: List[str] _mon_connects: List[str] _mon_sockopts: List[Tuple[int, Any]] _ctrl_binds: List[str] _ctrl_connects: List[str] _ctrl_sockopts: List[Tuple[int, Any]] _random_addrs: List[str] _sockets: List[zmq.Socket] def __init__( self, context, name : str, in_type: Optional[int] = None, out_type: Optional[int] = None, mon_type: Optional[int]= None, ctrl_type: Optional[int] = None ) -> None: threading.Thread.__init__(self,name = f'Relay-{name}',daemon=False) self.name = f'Relay{name}' self.logger = logging.getLogger(__name__) self.context = context if in_type is None: raise TypeError("in_type must be specified") self.in_type = in_type self._in_binds = [] self._in_connects = [] self._in_sockopts = [] if out_type is None: raise TypeError("out_type must be specified") self.out_type = out_type self._out_binds = [] self._out_connects = [] self._out_sockopts = [] self.mon_type = mon_type self._mon_binds = [] self._mon_connects = [] self._mon_sockopts = [] self.ctrl_type = ctrl_type self._ctrl_binds = [] self._ctrl_connects = [] self._ctrl_sockopts = [] self._random_addrs = [] self.daemon = True self.done = False self._sockets = []
[docs] def bind_in(self, addr: str) -> None: self._in_binds.append(addr)
[docs] def bind_in_to_random_port(self, addr: str, *args, **kwargs) -> int: port = self._reserve_random_port(addr, *args, **kwargs) self.bind_in('%s:%i' % (addr, port)) return port
[docs] def connect_in(self, addr: str) -> None: self._in_connects.append(addr)
[docs] def setsockopt_in(self, opt: int, value: Any) -> None: self._in_sockopts.append((opt, value))
[docs] def bind_out(self, addr: str) -> None: self._out_binds.append(addr)
[docs] def bind_out_to_random_port(self, addr: str, *args, **kwargs) -> int: port = self._reserve_random_port(addr, *args, **kwargs) self.bind_out('%s:%i' % (addr, port)) return port
[docs] def connect_out(self, addr: str): self._out_connects.append(addr)
[docs] def setsockopt_out(self, opt: int, value: Any): self._out_sockopts.append((opt, value))
[docs] def bind_mon(self, addr): self._mon_binds.append(addr)
[docs] def bind_mon_to_random_port(self, addr, *args, **kwargs): port = self._reserve_random_port(addr, *args, **kwargs) self.bind_mon('%s:%i' % (addr, port)) return port
[docs] def connect_mon(self, addr): self._mon_connects.append(addr)
[docs] def setsockopt_mon(self, opt, value): self._mon_sockopts.append((opt, value))
[docs] def bind_ctrl(self, addr): self._ctrl_binds.append(addr)
[docs] def bind_ctrl_to_random_port(self, addr, *args, **kwargs): port = self._reserve_random_port(addr, *args, **kwargs) self.bind_ctrl('%s:%i' % (addr, port)) return port
[docs] def connect_ctrl(self, addr): self._ctrl_connects.append(addr)
[docs] def setsockopt_ctrl(self, opt, value): self._ctrl_sockopts.append((opt, value))
def _reserve_random_port(self, addr: str, *args, **kwargs) -> int: with self.context as ctx: with ctx.socket(PUSH) as binder: for i in range(5): port = binder.bind_to_random_port(addr, *args, **kwargs) new_addr = '%s:%i' % (addr, port) if new_addr in self._random_addrs: continue else: break else: raise ZMQBindError("Could not reserve random port.") self._random_addrs.append(new_addr) return port def _setup_sockets(self) -> Tuple[zmq.Socket, zmq.Socket, zmq.Socket, zmq.Socket]: ctx = self.context # create the sockets ins = ctx.socket(self.in_type) self._sockets.append(ins) if self.out_type < 0: outs = ins else: outs = ctx.socket(self.out_type) self._sockets.append(outs) # set sockopts (must be done first, in case of zmq.IDENTITY) for opt, value in self._in_sockopts: ins.setsockopt(opt, value) for opt, value in self._out_sockopts: outs.setsockopt(opt, value) for iface in self._in_binds: ins.bind(iface) for iface in self._out_binds: outs.bind(iface) for iface in self._in_connects: ins.connect(iface) for iface in self._out_connects: outs.connect(iface) if self.mon_type is not None: mons = ctx.socket(self.mon_type) self._sockets.append(mons) # set sockopts (must be done first, in case of zmq.IDENTITY) for opt, value in self._mon_sockopts: mons.setsockopt(opt, value) for iface in self._mon_binds: mons.bind(iface) for iface in self._mon_connects: mons.connect(iface) else: mons = None if self.ctrl_type is not None: ctrls = ctx.socket(self.ctrl_type) self._sockets.append(ctrls) for opt, value in self._ctrl_sockopts: ctrls.setsockopt(opt, value) for iface in self._ctrl_binds: ctrls.bind(iface) for iface in self._ctrl_connects: ctrls.connect(iface) else: ctrls = None return ins, outs, mons, ctrls def _close_sockets(self): """Cleanup sockets we created""" for s in self._sockets: if s and not s.closed: s.close()
[docs] def run(self) -> None: """wrap run_device in try/catch ETERM""" try: self.run_device() except ZMQError as e: if e.errno in {ETERM, ENOTSOCK}: # silence TERM, ENOTSOCK errors, because this should be a clean shutdown pass else: raise finally: self.done = True self._close_sockets()
[docs] def run_device(self): ins, outs, mons, ctrls = self._setup_sockets() self.poller = zmq.Poller() self.poller.register(ins,zmq.POLLIN) self.poller.register(outs,zmq.POLLIN) if ctrls: self.poller.register(ctrls,zmq.POLLIN) while True: sockets = dict(self.poller.poll()) if len(sockets) == 0: break if ins in sockets: _msg = ins.recv() outs.send(_msg) if mons: mons.send(_msg) self.logger.info(f"relay {self.name}: msg to actor") if outs in sockets: _msg = outs.recv() if mons: mons.send(_msg) ins.send(_msg) self.logger.info(f"relay {self.name}: msg from actor") if ctrls in sockets: _msg = ctrls.recv() if _msg == b'TERMINATE': break else: pass self.logger.info(f"relay {self.name}: terminated")