riaps.run.port module

Base class for all Port objects

class riaps.run.port.BindPort(parentComponent, portName, portSpec)[source]

Bases: Port

closeBindSocket()[source]
setupBindSocket(owner, zmqType, portKind, sockopts=[])[source]

Set up a bind socket

class riaps.run.port.ConnPort(parentComponent, portName, portSpec)[source]

Bases: Port

closeConnSocket()[source]
connected()[source]

Return the number of servers this port is connected to.

resetConnSocket(zmqType, sockopts=[])[source]

Reset a conn socket: remove and recreate

setupConnSocket(owner, zmqType, portKind, sockopts=[])[source]

Setup a conn socket

update(host, port)[source]

Update the client – connect its socket to a server

class riaps.run.port.DuplexBindPort(parentComponent, portName, portSpec)[source]

Bases: BindPort, DuplexPort

class riaps.run.port.DuplexConnPort(parentComponent, portName, portSpec)[source]

Bases: ConnPort, DuplexPort

class riaps.run.port.DuplexPort(parentComponent, portName, portSpec)[source]

Bases: Port

class riaps.run.port.Port(parentPart, portName, portSpec=None)[source]

Bases: object

Base class for all Port objects.

Port objects are used by a component to communicate with other components, in the same process, on the same host, or on the same network. Ports encapsulate low-level communication objects (zeromq sockets).

Parameters:
  • parentPart (Part) – the Part object that owns this port.

  • portName (str) – the name of the port (from the model)

activate()[source]

Activate the port object. Subclasses can override this method.

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

deactivate()[source]

Deactivate the port object. Subclasses can override this method.

getDeadline()[source]

Return the deadline parameter associated with the port’s operation

Returns:

Deadline for the operation associated with the port, in seconds.

Return type:

float

getGlobalIface()[source]

Return the IP address of the global network interface

The operation retrieves the result from the parent actor and caches it.

Returns:

Global IP address of the form xxx.xxx.xxx.xxx

Return type:

str

getIndex()[source]

Return the index of the port.

For input ports the index is a small integer indicating its position in the port list of the component, for non-input ports it is None. The index is used to determine the priority order for the port among all the ports, the concrete value is irrelevant.

Returns:

Index value for the port among all input ports.

Return type:

int

getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getLocalIface()[source]

Return the IP address of the local network interface (typically 127.0.0.1)

The operation retrieves the result from the parent actor and caches it.

Returns:

Local IP address of the form xxx.xxx.xxx.xxx

Return type:

str

getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

get_hwm()[source]

Retrieve the high-water mark for the socket.

get_recvTime()[source]

Return the timestamp taken at the last receive operation.

Returns:

time of the last message receive operation

Return type:

float

get_recv_timeout()[source]

Retrieve the receive timeout parameter for the port.

Receive timeout determines how long a receive operation will block before throwing a PortError.EAGAIN exception. None means infinite timeout.

Returns:

None (if no timeout is set) or the timeout value in seconds.

Return type:

None or float

get_sendTime()[source]

Return the timestamp of the sending time of the last message receive.

Returns:

time when the last message received was sent

Return type:

float

get_send_timeout()[source]

Retrieve the send timeout parameter for the port.

Send timeout determines how long a send operation will block before throwing a PortError.EAGAIN exception. None means infinite timeout.

Returns:

None (if no timeout is set) or the timeout value in seconds.

Return type:

None or float

inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

port_recv(is_pyobj)[source]

Lowest level message receiving operation. Subclasses can override this operation.

The message is received as a multi-part message.If the receiving port is timed, the current timestamp is saved as the time of message reception. If the message is to be received as a Python object, it is unpickled, otherwise the message is returned as is (as a bytes). If the message included a timestamp, it is retrieved and saved as the time of message sending.

Parameters:

is_pyobj (bool) – flag to indicate if the expected message is a Python data object.

Returns:

the message received

Except:

Throws a PortError exception when the underlying network operation fails.

port_send(msg, is_pyobj)[source]

Lowest level message sending operation. Subclasses can override this operation.

If the message is to be sent as a Python object, it is pickled; otherwise it is assumed to be a bytes. The message is packed into a frame, and, if the port is ‘timed’ a current timestamp is appended as another frame. The message is sent as a multi-part message.

Parameters:
  • msg (either a bytes or any Python data object) – message to be sent

  • is_pyobj (bool) – flag to indicate if the message is a Python object.

Returns:

True

Except:

Throws a PortError exception when the underlying network operation fails.

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recv_capnp()[source]

DEPRECATED. Receive an object (if possible) through the port

recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

send_capnp(msg)[source]

DEPRECATED. Send a byte array (if possible) out through the port

send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

setOwner(owner)[source]

Save owner thread into a data member.

Parameters:

owner (ComponentThread) – The ComponentThread the port is handled in.

set_recv_timeout(rto)[source]

Set the receive timeout for the port.

Receive timeout determines how long a receive operation will block before throwing a PortError.EAGAIN exception. None means infinite timeout.

Parameters:

rto (None or float) – None (if no timeout is set) or the timeout value in seconds.

set_send_timeout(sto)[source]

Set the send timeout for the port.

Send timeout determines how long a send operation will block before throwing a PortError.EAGAIN exception. None means infinite timeout.

Parameters:

sto (None or float) – None (if no timeout is set) or the timeout value in seconds.

set_sockoptions(sockopts)[source]
setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupCurve(server)[source]
setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.

terminate()[source]

Terminate all activities of the port. Subclasses can override this method.

update(host, port)[source]

Update the socket with information from the discovery service. Abstract, subclasses must implement this method.

Called when the discovery service notifies the actor about a new service provide (e.g. server, publisher, etc.) the port needs to connect to. The operation will perform the connection.

Parameters:
  • host (str) – IP address of the service provider

  • port (int) – port number of the service provider

class riaps.run.port.PortInfo(portKind, portScope, portName, msgType, portHost, portNum)

Bases: tuple

msgType

Alias for field number 3

portHost

Alias for field number 4

portKind

Alias for field number 0

portName

Alias for field number 2

portNum

Alias for field number 5

portScope

Alias for field number 1

class riaps.run.port.PortScope(value)[source]

Bases: IntEnum

An enumeration.

GLOBAL = 1
INTERNAL = 3
LOCAL = 2
scope()[source]
class riaps.run.port.SimplexBindPort(parentComponent, portName, portSpec)[source]

Bases: BindPort, SimplexPort

class riaps.run.port.SimplexConnPort(parentComponent, portName, portSpec)[source]

Bases: ConnPort, SimplexPort

class riaps.run.port.SimplexPort(parentComponent, portName, portSpec)[source]

Bases: Port