riaps.run.dcPorts module

Distributed coordination - Communication ports for the groups.

Created on Feb 23, 2019 Author: riaps

class riaps.run.dcPorts.GroupAnsPort(parentPart, portName, groupSpec)[source]

Bases: BindPort, GroupDuplexPort

Group answer port is for the leader to receive messages from members. Based on a DEALER socket. Group-internal communication port for messaging with the leader, no message type, but can be timed.

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getPortNumber()[source]
getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

get_identity()[source]
inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recvFromMember()[source]
recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(_msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

sendToMember(msgType, msg)[source]
send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

set_identity(identity)[source]
setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.

update()[source]

Update the socket with information from the discovery service. Abstract, subclasses must implement this method.

Called when the discovery service notifies the actor about a new service provide (e.g. server, publisher, etc.) the port needs to connect to. The operation will perform the connection.

Parameters:
  • host (str) – IP address of the service provider

  • port (int) – port number of the service provider

updatePoller(poller)[source]
class riaps.run.dcPorts.GroupDuplexPort(parentComponent, portName, groupSpec)[source]

Bases: Port

class riaps.run.dcPorts.GroupPubPort(parentPart, portName, groupSpec)[source]

Bases: BindPort, GroupSimplexPort

Group Publisher port is for publishing application and housekeeping messages for all group members.

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

sendGroup(msgType, msg)[source]
send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.

update(host, port)[source]

Update the socket with information from the discovery service. Abstract, subclasses must implement this method.

Called when the discovery service notifies the actor about a new service provide (e.g. server, publisher, etc.) the port needs to connect to. The operation will perform the connection.

Parameters:
  • host (str) – IP address of the service provider

  • port (int) – port number of the service provider

class riaps.run.dcPorts.GroupQryPort(parentPart, portName, groupSpec)[source]

Bases: ConnPort, GroupDuplexPort

Group query port is for accessing the leader from members. Based on a DEALER socket. Group-internal communication port for messaging with the leader, no message type, but can be timed.

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve relevant information about this port

getSocket()[source]

Return the socket of port

inSocket()[source]

Return True because the socket is used of input

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recvFromLeader()[source]
recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(_msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

sendToLeader(msgType, msg)[source]
send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

setup()[source]

Set up the port

setupSocket(owner)[source]

Set up the socket of the port. Return a tuple suitable for querying the discovery service for the servers (not used currently).

update(host, port)[source]

Update the query port – connect its socket to a server

class riaps.run.dcPorts.GroupSimplexPort(parentComponent, portName, groupSpec)[source]

Bases: Port

class riaps.run.dcPorts.GroupSubPort(parentPart, portName, groupSpec)[source]

Bases: ConnPort, GroupSimplexPort

Group subscriber port is for receiving application and housekeeping messages from all group members.

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recvGroup()[source]
recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(_msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.