riaps.deplo.relay module

Created on June 14, 2024

Based on code from pyzmq

@author: riaps

class riaps.deplo.relay.Relay(context, name: str, in_type: int | None = None, out_type: int | None = None, mon_type: int | None = None, ctrl_type: int | None = None)[source]

Bases: Thread

A variant for the 0MQ ThreadProxySteerable.

Ports: - in: DEALER (with identity = appName.actorName.PID).

Connected to resource/fault/etc. monitors and depm control socket for this actor.

  • out: PAIR. Actor connects to it.

  • mon: PUB. depm monitor socket connects to it.

  • ctrl:PAIR. Used to stop relay thread.

Flows: - in -> (out,mon) : - -> ctrl: manages (stops, etc.) relay thread - out ->

You do not pass Socket instances to this, but rather Socket types:

Relay(in_socket_type, out_socket_type, mon_socket_type, ctrl_socket_type)

For instance:

dev = Device(zmq.QUEUE, zmq.DEALER, zmq.ROUTER)

Similar to zmq.device, but socket types instead of sockets themselves are passed, and the sockets are created in the work thread, to avoid issues with thread safety. As a result, additional bind_{in|out} and connect_{in|out} methods and setsockopt_{in|out} allow users to specify connections for the sockets.

Parameters

{in|out|mon|ctrl}_typeint

zmq socket types, to be passed later to context.socket(). e.g. zmq.PUB, zmq.SUB, zmq.REQ. If out_type is < 0, then in_socket is used for both in_socket and out_socket.

Methods

bind_{in|out|mon|ctrl}(iface)

passthrough for {in|out|mon|ctrl}_socket.bind(iface), to be called in the thread

connect_{in|out|mon|ctrl}(iface)

passthrough for {in|out|mon|ctrl}_socket.connect(iface), to be called in the thread

setsockopt_{in|out|mon|ctrl}(opt,value)

passthrough for {in|out|mon|ctrl}_socket.setsockopt(opt, value), to be called in the thread

Attributes

daemonbool

sets whether the thread should be run as a daemon Default is true, because if it is false, the thread will not exit unless it is killed

context_factorycallable (class attribute)

Function for creating the Context. This will be Context.instance due to implementation via threads..

bind_ctrl(addr)[source]
bind_ctrl_to_random_port(addr, *args, **kwargs)[source]
bind_in(addr: str) None[source]
bind_in_to_random_port(addr: str, *args, **kwargs) int[source]
bind_mon(addr)[source]
bind_mon_to_random_port(addr, *args, **kwargs)[source]
bind_out(addr: str) None[source]
bind_out_to_random_port(addr: str, *args, **kwargs) int[source]
connect_ctrl(addr)[source]
connect_in(addr: str) None[source]
connect_mon(addr)[source]
connect_out(addr: str)[source]
ctrl_type: int
in_type: int
mon_type: int
out_type: int
run() None[source]

wrap run_device in try/catch ETERM

run_device()[source]
setsockopt_ctrl(opt, value)[source]
setsockopt_in(opt: int, value: Any) None[source]
setsockopt_mon(opt, value)[source]
setsockopt_out(opt: int, value: Any)[source]