Welcome to riaps-pycom documentation!

This documents the APIs for RIAPS application management (riaps_ctrl and riaps_deplo), Python component framework (riaps_actor and riaps_device), node management tool (riaps_fab using fabric), code generation tool (riaps_gen) and an alternate discovery service (redis-based).

riaps package

Subpackages

riaps.consts package

Submodules
riaps.consts.const module

Const class to emulate constants.

Created on Oct 23, 2016

@author: riaps

class riaps.consts.const.const[source]

Bases: object

exception ConstError[source]

Bases: TypeError

riaps.consts.defs module

Constants for the run-time system Created on Oct 20, 2016

@author: riaps

Module contents

riaps.ctrl package

Submodules
riaps.ctrl.ctrl module
riaps.ctrl.ctrlcli module
riaps.ctrl.ctrlgui module
riaps.ctrl.ctrlsrv module
riaps.ctrl.main module
Module contents

riaps.deplo package

Submodules
riaps.deplo.appdb module

Application database Created on Apr 2, 2018

@author: riaps

class riaps.deplo.appdb.AppDbase[source]

Bases: object

Application database. The database is a collection of key -> value pairs, where values are pickled Python objects (i.e. bytearrays). Structure: RIAPSAPPS -> [ appNames* ] appName -> [ actorRecords* ]

RIAPSAPPS = 'RIAPSAPPS'
RIAPSDISCO = 'RIAPSDISCO'
RIAPSDISCOCMD = 'RIAPSDISCOCMD'
addApp(appName)[source]

Add a new app to the database

addAppActor(appName, actorRecord)[source]
closeDbase()[source]
delApp(appName)[source]
delAppActor(appName, actorName)[source]
delDiscoCommand()[source]
delKey(key)[source]
getAppActor(appName, actorName)[source]
getAppActors(appName)[source]
getApps()[source]
getDisco()[source]
getDiscoCommand()[source]
getKeyValue(key, default=None)[source]
putKeyValue(key, value)[source]
replaceKeyValue(key, value)[source]
setDisco(disco)[source]
setDiscoCommand(discoCmd)[source]
riaps.deplo.cpumon module

Resource monitors

Created on Nov 23, 2017

@author: riaps

class riaps.deplo.cpumon.CPUMonitorThread(parent, interval, usage)[source]

Bases: Thread

Thread for monitoring an actor and enforcing resource limits.

addClientDevice(appName, actorName, device)[source]
is_running()[source]
restart()[source]
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

setup(proc)[source]
stop()[source]
terminate()[source]
riaps.deplo.deplo module
riaps.deplo.depm module
riaps.deplo.fm module
riaps.deplo.main module
riaps.deplo.memmon module

Resource monitors

Created on Nov 23, 2017

@author: riaps

class riaps.deplo.memmon.MemMonitorThread(parent, efd, usage)[source]

Bases: Thread

addClientDevice(appName, actorName, device)[source]
is_running()[source]
restart()[source]
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

setup(proc)[source]
stop()[source]
terminate()[source]
riaps.deplo.netmon module

Resource monitors

Created on Nov 23, 2017

@author: riaps

class riaps.deplo.netmon.NHAction[source]

Bases: object

MAP = {1: 'SET', 2: 'REMOVE'}
REMOVE = 2
SET = 1
class riaps.deplo.netmon.NHLoopStatus[source]

Bases: object

Return codes from nethogsmonitor_loop()

FAILURE = 1
MAP = {0: 'OK', 1: 'FAILURE', 2: 'NO_DEVICE'}
NO_DEVICE = 2
OK = 0
class riaps.deplo.netmon.NHMonitorRecord[source]

Bases: Structure

ctypes version of the struct of the same name from libnethogs.h

device_name

Structure/Union member

name

Structure/Union member

pid

Structure/Union member

record_id

Structure/Union member

recv_bytes

Structure/Union member

recv_kbs

Structure/Union member

sent_bytes

Structure/Union member

sent_kbs

Structure/Union member

uid

Structure/Union member

class riaps.deplo.netmon.NetMonitorThread(parent)[source]

Bases: Thread

addClientDevice(appName, actorName, device, proc, rate)[source]
addProc(appName, actName, proc)[source]
delProc(appName, actName, proc)[source]
dev_args(devnames)[source]

Return the appropriate ctypes arguments for a device name list, to pass to libnethogs nethogsmonitor_loop_devices. The return value is a 2-tuple of devc (ctypes.c_int) and devicenames (ctypes.POINTER) to an array of ctypes.c_char).

Parameters:

devnames (list) – list of device names to monitor

Returns:

2-tuple of devc, devicenames ctypes arguments

Return type:

tuple

is_running()[source]
network_activity_callback(_action, data)[source]
restart()[source]
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

stop()[source]
terminate()[source]
riaps.deplo.procm module

Created on Apr 7, 2018

@author: riaps

class riaps.deplo.procm.ProcessManager(parent)[source]

Bases: object

Manages processes: service(s) and actors started by deplo

monitor(qualName, proc)[source]
release(qualName)[source]
class riaps.deplo.procm.ProcessMonitor(parent, qualName)[source]

Bases: Thread

Thread for monitoring a process

error()[source]
release()[source]
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

setup(record)[source]
terminate()[source]
class riaps.deplo.procm.ProcessMonitorRecord(name, proc, thread)

Bases: tuple

name

Alias for field number 0

proc

Alias for field number 1

thread

Alias for field number 2

riaps.deplo.resm module

Resource manager

Created on Nov 23, 2017

@author: riaps

class riaps.deplo.resm.ActorResourceManager(parent, actorName, actorDef)[source]

Bases: object

Resource manager for an actor

addClientDevice(device)[source]
cleanupActor()[source]
setupCPU()[source]
setupMem()[source]
setupNet()[source]
setupSpace()[source]
startActor(proc)[source]
stopActor(proc)[source]
class riaps.deplo.resm.AppResourceManager(parent, appName, appFolder, userName)[source]

Bases: object

Resource manager for an app

addActor(actorName, actorDef)[source]
addClientDevice(actorName, device)[source]
addQuota(usage)[source]
claimApp()[source]
cleanupApp()[source]
delQuota()[source]
getUserName()[source]
nextActorID()[source]
reclaimApp()[source]
startActor(actorName, proc)[source]
stopActor(actorName, proc)[source]
class riaps.deplo.resm.ResourceManager(context)[source]

Bases: object

Resource manager

addActor(appName, actorName, actorDef)[source]
addClientDevice(appName, actorName, device)[source]
cleanupApp(appName)[source]
cleanupApps()[source]

Cleanup all apps: remove the cgroup of the apps

cleanupNet()[source]
getUserName(appName)[source]
reclaimApp(appName)[source]
setupApp(appName, appFolder, userName)[source]

Start an app: create an app resource manager for this app if it has not been created yet

startActor(appName, actorName, proc)[source]
stopActor(appName, actorName, proc)[source]
terminate()[source]
riaps.deplo.spcmon module

Resource monitors

Created on Nov 23, 2017

@author: riaps

class riaps.deplo.spcmon.SpcMonitorThread(parent)[source]

Bases: Thread

addClientDevice(appName, actorName, device, proc)[source]
addProc(appName, actName, proc)[source]
delProc(appName, actName, proc)[source]
is_running()[source]
restart()[source]
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

stop()[source]
terminate()[source]
Module contents

riaps.discd package

Submodules
riaps.discd.dbase module

Discovery server database interface Created on Oct 19, 2016

@author: riaps

class riaps.discd.dbase.DiscoDbase(context_, dbaseLoc)[source]

Bases: object

Discovery service database base class

detach(key: str, target: str)[source]

Detach client (for updates) from keys

fetch(key: str, client: str) [<class 'str'>][source]

Fetch value(s) under key. Add client to list of clients interested in the value

fetchUpdates() [<class 'str'>][source]

Check and fetch the updated values of the subscribed keys if any

insert(key: str, value: str) [<class 'str'>][source]

Insert value under key and return list of clients of value (if any). A key may have multiple values associated with it, hence the new value is added to the set of values that belong to the key

remove(key: str, value: str) [<class 'str'>][source]

Remove value from value under key.

start()[source]

Start the database: connect to the database process

terminate()[source]
riaps.discd.dbase_dht module
riaps.discd.dbase_redis module

Discovery server database interface Created on Oct 19, 2016

@author: riaps

class riaps.discd.dbase_redis.RedisDbase(context_, dbaseLoc)[source]

Bases: DiscoDbase

Discovery service database implemented using redis

addSub(newKey)[source]

Update the list of subscribed keys with the new key

delSub(key)[source]

Delete subscription to key

detach(key: str, target: str)[source]

Detach update client from keys

fetch(key: str, client: str) [<class 'str'>][source]

Fetch value(s) under key. Add client to list of clients interested in the value

fetchUpdates()[source]

Check and fetch the updated values of the subscribed keys if any

insert(key: str, value: str) [<class 'str'>][source]

Insert value under key and return list of clients of value (if any). A key may have multiple values associated with it, hence the new value is added to the set of values that belong to the key

remove(key: str, value: str) [<class 'str'>][source]

Remove value from values under key.

start()[source]

Start the database: connect to the database process

riaps.discd.discs module
riaps.discd.main module
Module contents

riaps.fabfile package

Submodules
riaps.fabfile.deplo module
riaps.fabfile.fabfile module
riaps.fabfile.riaps module
riaps.fabfile.sys module
riaps.fabfile.time module
Module contents

riaps.gen package

Subpackages
riaps.gen.target package
Subpackages
riaps.gen.target.capnp package
Submodules
riaps.gen.target.capnp.capnpfilters module
riaps.gen.target.capnp.capnpfilters.generate_capnp_id(value)[source]
riaps.gen.target.capnp.capnpgen module
class riaps.gen.target.capnp.capnpgen.CapnpGenerator(cppmodel, output_dir)[source]

Bases: JinjaGenerator

create_environment(**kwargs)[source]

Return a new Jinja environment.

Derived classes may override method to pass additional parameters or to change the template loader type.

tasks = [<riaps.gen.target.capnp.capnpgen.CapnpTask object>]
templates_path = '/home/docs/checkouts/readthedocs.org/user_builds/riaps-pycom/checkouts/latest/src/riaps/gen/target/capnp/tpl'
class riaps.gen.target.capnp.capnpgen.CapnpTask(formatter=None, **kwargs)[source]

Bases: JinjaTask

filtered_elements(model)[source]

Iterator over model elements to execute this task for.

relative_path_for_element(element)[source]

Returns relative file path receiving the generator output for given element.

template_name = 'capnp.tpl'
riaps.gen.target.capnp.sync_capnp module
class riaps.gen.target.capnp.sync_capnp.FileSync(model)[source]

Bases: object

apply_capnp_rules(orig_filepath, new_filepath)[source]
sync_capnp(output_dir)[source]
Module contents
riaps.gen.target.cpp package
Submodules
riaps.gen.target.cpp.ccfilters module
riaps.gen.target.cpp.ccfilters.cpp_port_type(port_type)[source]
riaps.gen.target.cpp.ccfilters.handler_name(value)[source]
riaps.gen.target.cpp.ccfilters.port_macro(value, port_type)[source]
riaps.gen.target.cpp.ccfilters.recv_message_type(value, port_type)[source]
riaps.gen.target.cpp.ccfilters.recv_return_type(value, port_type)[source]
riaps.gen.target.cpp.ccfilters.sender_message_type(value, port_type)[source]
riaps.gen.target.cpp.ccfilters.sender_name(value)[source]
riaps.gen.target.cpp.cppgen module
class riaps.gen.target.cpp.cppgen.CmakeTask(formatter=None, **kwargs)[source]

Bases: JinjaTask

filtered_elements(model)[source]

Iterator over model elements to execute this task for.

relative_path_for_element(element)[source]

Returns relative file path receiving the generator output for given element.

template_name = 'cmake.tpl'
class riaps.gen.target.cpp.cppgen.CompCppBaseTask(part)[source]

Bases: JinjaTask

filtered_elements(model)[source]

Iterator over model elements to execute this task for.

relative_path_for_element(element)[source]

Returns relative file path receiving the generator output for given element.

template_name = 'comp.base.cc.tpl'
class riaps.gen.target.cpp.cppgen.CompCppTask(part)[source]

Bases: JinjaTask

filtered_elements(model)[source]

Iterator over model elements to execute this task for.

relative_path_for_element(element)[source]

Returns relative file path receiving the generator output for given element.

template_name = 'comp.cc.tpl'
class riaps.gen.target.cpp.cppgen.CompGenerator(environment=None, **kwargs)[source]

Bases: JinjaGenerator

create_environment(**kwargs)[source]

Return a new Jinja environment.

Derived classes may override method to pass additional parameters or to change the template loader type.

tasks = [<riaps.gen.target.cpp.cppgen.CompHppBaseTask object>, <riaps.gen.target.cpp.cppgen.CompCppBaseTask object>, <riaps.gen.target.cpp.cppgen.CompHppTask object>, <riaps.gen.target.cpp.cppgen.CompCppTask object>, <riaps.gen.target.cpp.cppgen.CompHppBaseTask object>, <riaps.gen.target.cpp.cppgen.CompCppBaseTask object>, <riaps.gen.target.cpp.cppgen.CompHppTask object>, <riaps.gen.target.cpp.cppgen.CompCppTask object>, <riaps.gen.target.cpp.cppgen.CmakeTask object>]
templates_path = '/home/docs/checkouts/readthedocs.org/user_builds/riaps-pycom/checkouts/latest/src/riaps/gen/target/cpp/tpl'
class riaps.gen.target.cpp.cppgen.CompHppBaseTask(part)[source]

Bases: JinjaTask

filtered_elements(model)[source]

Iterator over model elements to execute this task for.

relative_path_for_element(element)[source]

Returns relative file path receiving the generator output for given element.

template_name = 'comp.base.h.tpl'
class riaps.gen.target.cpp.cppgen.CompHppTask(part)[source]

Bases: JinjaTask

filtered_elements(model)[source]

Iterator over model elements to execute this task for.

relative_path_for_element(element)[source]

Returns relative file path receiving the generator output for given element.

template_name = 'comp.h.tpl'
riaps.gen.target.cpp.sync_cpp module
class riaps.gen.target.cpp.sync_cpp.FileSync(model)[source]

Bases: object

apply_cmake_rules(orig_filepath, new_filepath)[source]
apply_cpp_rules(orig_filepath, new_filepath)[source]
sync_all(output_dir)[source]
sync_cmake(output_dir)[source]
sync_code(output_dir)[source]
Module contents
riaps.gen.target.python package
Submodules
riaps.gen.target.python.pygen module
class riaps.gen.target.python.pygen.CompGenerator[source]

Bases: JinjaGenerator

create_environment(**kwargs)[source]

Return a new Jinja environment.

Derived classes may override method to pass additional parameters or to change the template loader type.

class riaps.gen.target.python.pygen.CompPyTask(part)[source]

Bases: JinjaTask

filtered_elements(model)[source]

Iterator over model elements to execute this task for.

relative_path_for_element(element)[source]

Returns relative file path receiving the generator output for given element.

template_name = 'comp.py.tpl'
riaps.gen.target.python.sync_python module
class riaps.gen.target.python.sync_python.FileSync(model)[source]

Bases: object

apply_py_rules(orig_filepath, new_filepath)[source]
sync_code(output_dir)[source]
Module contents
Module contents
Submodules
riaps.gen.gen module
riaps.gen.gen.main()[source]
riaps.gen.gen.preprocess(model)[source]
Module contents

riaps.lang package

Submodules
riaps.lang.depl module

Deployment language processor Created on Nov 7, 2016

@author: riaps

exception riaps.lang.depl.DeplError(message)[source]

Bases: Exception

class riaps.lang.depl.DeploymentModel(fileName, debug=False, verbose=False)[source]

Bases: object

Deployment model loader/parser

getActuals(actuals)[source]
getAppName()[source]
getDeployments()[source]
getNetwork()[source]
riaps.lang.depl.main(standalone=False)[source]
riaps.lang.depll module

Top-level script to start the deployment language processor (‘depl’) Created on Oct 15, 2016

Arguments:

model : Name of deployment model file to be processed

@author: riaps

riaps.lang.gviz module

Created on Mar 17, 2018

@author: riaps

riaps.lang.gviz.cleanupMessages(G, msgMap, msgMapUsed)[source]
riaps.lang.gviz.findMsgNode(msgType, actorLocals, actorLocalMessageNodes, actorInternals, actorInternalMessageNodes, msgMap, msgMapUsed, localMsgGraph, internalMsgGraph)[source]
riaps.lang.gviz.findMsgNodePair(msgType, actorLocals, actorLocalMessageNodes, actorInternals, actorInternalMessageNodes, msgMap, localMsgGraph, internalMsgGraph, globalMsgGraph)[source]
riaps.lang.gviz.gviz(model, deplo, verbose=False)[source]
riaps.lang.gviz.main(debug=False)[source]
riaps.lang.gviz.unique(name)[source]
riaps.lang.gviz.visualize(deplo, models)[source]
riaps.lang.gviz.visualize_actors(G, appModel, hostName, hostLabel, actors, msgMap, msgMapUsed, globalMsgSubgraph)[source]
riaps.lang.gviz.visualize_messages(G, appModel, msgMap, msgMapUsed)[source]
riaps.lang.lang module

DSL for RIAPS software models Created on Oct 9, 2016 Uses the textX parser @author: riaps

exception riaps.lang.lang.LangError(message)[source]

Bases: Exception

class riaps.lang.lang.RiapsModel2JSON(model)[source]

Bases: object

Class to convert the RIAPS model (constructed by the parser) into a data structure suitable for generating JSON output.Dependent on the DSL syntax and the object structure built by the parser.

static convertMem(value, unit)[source]

Convert all memory size values to kilobytes

convertRate(value, unit)[source]

Convert all rate values to bytes/sec

static convertTime(value, unit)[source]

Convert all time values to msec

getActorScheduler(sched)[source]
getActors(actors)[source]
getActuals(actuals)[source]
getComponentScheduler(sched)[source]
getComponents(components)[source]
getFormals(formals)[source]
getGroups(groups)[source]
getIOComponents(components)[source]
getImpl(comp)[source]
getInstances(instances)[source]
getInternals(internals)[source]
getLibraries(libraries)[source]
getLocals(locals_)[source]
getMessages(messages)[source]
getPorts(ports)[source]
getUsage(usage)[source]
riaps.lang.lang.actor_obj_processor(actor)[source]
riaps.lang.lang.compileModel(modelFileName, verbose=False, debug=False, generate=True)[source]
riaps.lang.lang.insport_obj_processor(insport)[source]
riaps.lang.lang.instance_obj_processor(instance)[source]
riaps.lang.lang.main(debug=False)[source]
riaps.lang.lang.op_port_obj_processor(port)[source]
riaps.lang.lang.timed_port_obj_processor(port)[source]
riaps.lang.lang.timport_obj_processor(timport)[source]
Module contents

riaps.logger package

Subpackages
riaps.logger.drivers package
Submodules
riaps.logger.drivers.base_driver module
class riaps.logger.drivers.base_driver.BaseDriver(driver_type)[source]

Bases: ABC

close()[source]
abstract handle(msg)[source]
riaps.logger.drivers.console_driver module
class riaps.logger.drivers.console_driver.ServerLogDriver(driver_type, session_name)[source]

Bases: BaseDriver

close()[source]
handle(msg)[source]
riaps.logger.drivers.factory module
riaps.logger.drivers.file_driver module
class riaps.logger.drivers.file_driver.ServerLogDriver(driver_type, session_name)[source]

Bases: BaseDriver

close()[source]
handle(msg)[source]
riaps.logger.drivers.tmux_driver module
Module contents
Submodules
riaps.logger.main module
riaps.logger.riaps_log_config_test module

Script to test app log config file

Created on Oct 20, 2022

Arguments -f (or –file) FILE : Path to the file that will be used to construct the loggers @author: riaps

riaps.logger.riaps_log_config_test.main()[source]
riaps.logger.riaps_log_config_test.test_loggers(loggers, msg)[source]
riaps.logger.server module
Module contents

riaps.proto package

Module contents

riaps.run package

Submodules
riaps.run.actor module
riaps.run.ansPort module

Created on Oct 10, 2016

@author: riaps

class riaps.run.ansPort.AnsPort(parentComponent, portName, portSpec)[source]

Bases: DuplexBindPort

classdocs

ans_port_recv(is_pyobj)[source]
ans_port_send(msg, is_pyobj)[source]
closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

get_identity()[source]
inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

set_identity(identity)[source]
setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.

update(host, port)[source]

Update the socket with information from the discovery service. Abstract, subclasses must implement this method.

Called when the discovery service notifies the actor about a new service provide (e.g. server, publisher, etc.) the port needs to connect to. The operation will perform the connection.

Parameters:
  • host (str) – IP address of the service provider

  • port (int) – port number of the service provider

riaps.run.cltPort module

Client port class Created on Oct 10, 2016

@author: riaps

class riaps.run.cltPort.CltPort(parentComponent, portName, portSpec)[source]

Bases: DuplexConnPort

Client port is to access a server. Has a request and a response message type, and uses a REQ socket.

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve relevant information about this port

getSocket()[source]

Return the socket of port

inSocket()[source]

Return False because the socket is not used as direct input (client has to recv explicitly)

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

setup()[source]

Set up the port

setupSocket(owner)[source]

Set up the socket of the port. Return a tuple suitable for querying the discovery service for the publishers

riaps.run.comp module

Component class Created on Oct 15, 2016

@author: riaps

class riaps.run.comp.Component[source]

Bases: object

Base class for RIAPS application components

GROUP_PRIORITY_MAX = 0
GROUP_PRIORITY_MIN = 256
getActorID()[source]

Return a globally unique ID (8 bytes) for the parent actor.

getActorName()[source]

Return the name of the parent actor (as in model)

getAppName()[source]

Return the name of the parent application (as in model)

getLocalID()[source]

Return a locally unique ID (int) of the component. The ID is unique within the actor.

getName()[source]

Return the name of the component (as in model)

getTypeName()[source]

Return the name of the type of the component (as in model)

getUUID()[source]

Return the deployment-unique ID for the parent actor

handleActionVoteRequest(group, rfvId, when)[source]

Default handler for request to vote an action in the future (in member) Implementation must recv/recv_pyobj to obtain the action topic.

handleActivate()[source]

Default activation handler

handleCPULimit()[source]

Default handler for CPU limit exceed

handleDeactivate()[source]

Default deactivation handler

handleDeadline(_funcName)[source]

Default handler for deadline violation

handleGroupMessage(_group)[source]

Default handler for group messages. Implementation must immediately call recv/recv_pyobj on the group to obtain message.

handleLeaderElected(group, leaderId)[source]

Default handler for ‘leader elected’ events

handleLeaderExited(group, leaderId)[source]

Default handler for ‘leader exited’ events

handleMemLimit()[source]

Default handler for memory limit exceed

handleMemberJoined(group, memberId)[source]

Default handler for ‘member join’ events

handleMemberLeft(group, memberId)[source]

Default handler for ‘member leave’ events

handleMessageFromLeader(group)[source]

Default handler for messages received from the leader (in member) Member implementation must immediately call recv/recv_pyobj on the group to obtain message.

handleMessageToLeader(group)[source]

Default handler for messages sent to the leader (in leader) Leader implementation must immediately call recv/recv_pyobj on the group to obtain message.

handleNICStateChange(state)[source]

Default handler for NIC state change

handleNetLimit()[source]

Default handler for space limit exceed

handlePassivate()[source]

Default activation handler

handlePeerStateChange(state, uuid)[source]

Default handler for peer state change

handleSpcLimit()[source]

Default handler for space limit exceed

handleVoteRequest(group, rfvId)[source]

Default handler for vote requests (in member) Implementation must recv/recv_pyobj to obtain the topic.

handleVoteResult(group, rfvId, vote)[source]

Default handler for the result of a vote (in member)

joinGroup(groupName, instName, groupMinSize=1, groupPriority=256)[source]
leaveGroup(group)[source]
class riaps.run.comp.ComponentThread(parent)[source]

Bases: Thread

Component execution thread. Runs the component’s code, and communicates with the parent actor.

addGroupSocket(group, groupPriority)[source]
batchScheduler(sockets)[source]

Batch scheduler for the component message processing.

The dictionary containing the active sockets is scanned and the associated handler is invoked.

delGroupSocket(group)[source]
executeHandlerFor(socket)[source]

Execute the handler for the socket

The handler is always allowed to run to completion, the operation is never preempted.

getInfo()[source]
logEvent(msg)[source]
priorityScheduler(sockets)[source]

priority scheduler for the component message processing.

The priority order is determined by the order of component ports. The dictionary of active sockets is scanned, and the they are inserted into a priority queue (according to their priority value). The queue is processed (in order of priority). After each invocation, the inputs are polled (in a no-wait operation) and the priority queue is updated.

replaceSocket(portObj, newSocket)[source]
rrScheduler(sockets)[source]

Round-robin scheduler for the component message processing.

The round-robin order is determined by the order of component ports. The dictionary of active sockets is scanned, and the associated handlers are invoked in round-robin order. After each invocation, the inputs are polled (in a no-wait operation) and the round-robin queue is updated.

run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

runCommand()[source]
sendControl(msg)[source]
setupControl()[source]

Create the control socket and connect it to the socket in the parent part

setupPoller()[source]
setupScheduler()[source]

Select the message scheduler algorithm based on the model.

setupSockets()[source]
riaps.run.dc module

Distributed coordination

Python implementation of group formation, communications, leader election, consensus and action coordination.

Created on Feb 23, 2019 Author: riaps

class riaps.run.dc.Coordinator(parent)[source]

Bases: object

Coordinator object. Each component has a coordinator object that creates the group objects for a component. A group instance can be created only once in a component, subsequent creations return the same group object

getGroup(groupType, groupInstance)[source]

Returns a group instance based on its name

groupName(groupType, groupInstance)[source]

Form a name for a group instance

joinGroup(thread, groupType, groupInstance, componentId, groupMinSize)[source]

Operation to create a group instance (with capacity) in a component. Returns the instance

leaveGroup(group)[source]

Operation to ‘leave’ a group by a component. The group will be deactivated / its threads stopped, and deleted..

class riaps.run.dc.Group(parent, thread, groupType, groupInstance, componentId, groupSpec, groupMinSize)[source]

Bases: object

Group object, represents one group instance that belongs to a component. Acts as the front-end for the GroupThread, channels messages to/from. Some of its methods are run in the GroupThread

GROUP_ACK = b'ack'
GROUP_ANN = b'ann'
GROUP_ERR = b'err'
GROUP_LEL = b'lel'
GROUP_LEX = b'lex'
GROUP_MFL = b'mfl'
GROUP_MJD = b'mjd'
GROUP_MLT = b'mlt'
GROUP_MSG = b'msg'
GROUP_MTL = b'mtl'
GROUP_NLD = b'nld'
GROUP_PARAMETERS = {'consensusTimeout': 1500, 'electionMax': 2000, 'electionMin': 1500, 'heartbeat': 1000, 'peerTimeout': 3000}
GROUP_RCM = b'rcm'
GROUP_RFV = b'rfv'
GROUP_RTC = b'rtc'
GROUP_UPD = b'upd'
getGroupId()[source]
getGroupName()[source]

Group unique name

getLeaderId()[source]

Return the leader’s id (or None if no leader)

getSocket()[source]

Returns inproc socket used to communicate with GroupThread

groupSize()[source]

Return the size of the group (>= 1).

groupSocketName(groupType, groupName, componentId)[source]

Forms the unique name for the inproc socket.

handleMessage(msgFrames=None)[source]

Receives message from the worker thread and handles it Runs in component thread (called from component polling loop, or from send_port)

hasLeader()[source]

True if the group has a leader.

isLeader()[source]

Return True if the group member IS the leader.

leave()[source]
recv()[source]

Receive a bytes object message from the worker thread Runs in component thread, called from a component

recv_msg(is_pyobj)[source]

Receive a message from the worker thread. Message are coming through a message queue. Runs in component thread, called from a component

recv_pyobj()[source]

Receive a Python object message from the worker thread Runs in component thread, called from a component

requestActionVote(action, when, kind='consensus', timeout=None)[source]
requestActionVote_pyobj(action, when, kind='consensus', timeout=None)[source]
requestVote(topic, kind='majority', timeout=None)[source]

Request a vote on a topic (with timeout). Topic is a bytes. A message is sent to the leader (if any) that starts a voting process. Returns None if there is no leader, otherwise a generated id string for the request.

requestVote_pyobj(topic, kind='majority', timeout=None)[source]

Request a vote on a topic (with timeout). Topic is a Python object. A message is sent to the leader (if any) that starts a voting process. Returns None if there is no leader, otherwise a generated id string for the request.

send(msg)[source]

Sends a bytes object as a message to all members of the group Runs in component thread

sendActionVote(rfvId, vote)[source]
sendToLeader(msg)[source]

Send message to group leader from a member Raise an exception if the group is not coordinated Return False (indicating operation failure) if no leader Runs in component thread.

sendToLeader_pyobj(msg)[source]

Send PyObject message to group leader from a member Raise an exception if the group is not coordinated Return False (indicating operation failure) if no leader Runs in component thread.

sendToMember(msg, identity=None)[source]

Send message to group member (with identity) from the leader If identity is not supplied, last value of identity is used. Raise an exception if the group is not coordinated Return False (indicating operation failure) if no leader Runs in component thread.

sendToMember_pyobj(msg, identity=None)[source]

Send PyObject message to group member (with identity) from the leader If identity is not supplied, last value of identity is used. Raise an exception if the group is not coordinated Return False (indicating operation failure) if no leader Runs in component thread.

sendVote(rfvId, vote)[source]

Send a vote (True/False) to the leader on a requested topic identified by the id of the request for vote.

send_port(msgType, msg, has_identity=False)[source]

Send a message to the worker thread. Used by all messages - the messages have multiple frames Runs in component thread

send_pyobj(msg)[source]

Sends a Python object as a message to all members of the group Runs in component thread

setup(groupThread)[source]

Set up all the sockets for the worker thread Runs in group thread

setupParams()[source]
unsetup(groupThread)[source]

Discard all the sockets used in worker thread Runs in group thread

update(host, port)[source]

Ask the worker thread to update its sockets. Called when the disco responds with the server (pub) host/port pair for the socket(s) to connect to by the client (sub) Runs in component thread

class riaps.run.dc.GroupThread(group)[source]

Bases: Thread

Worker thread for DC behavior

AUTHORITY = b'ldr'
CANDIDATE = 2
FOLLOWER = 1
HEARTBEAT = b'tic'
LEADER = 3
NO_LEADER = 0
REQVOTE = b'req'
RSPVOTE = b'vot'
announceConsensus(rfvId, vote)[source]

Announce consensus vote result (yes/no/timeout)

checkAllPolls(now)[source]

Check the results of all polls

checkPoll(poll, now)[source]

Check the result of of one poll

electionTimeout()[source]

Produce a random election timeout

getOwnId()[source]
handleCompMessage()[source]

Handle a message coming from the component

handleMessageForLeader()[source]

Handle message sent to the leader (in leader)

handleMessageForMember()[source]

Handle simple message from leader (in member)

handleNetMessage(now=None)[source]

Handle (broadcast) messages coming from the group. Messages could be data messages (to be handed over to the component), peer heartbeat messages, or election-related messages

handleTimeout(now)[source]

Handle the timeout on communications within the group

heartbeat(now)[source]

Send out a group heartbeat (so that group members can maintain an accurate membership list)

isLeader()[source]
run()[source]

Main loop for GroupThread - polls all sources and calls handlers

sendChangeMessage(change, someId)[source]

Send message about a change to the component

setLeaderDeadline()[source]

Set deadline for heartbeat from leader

setTimeout(value, now)[source]

Set the next timeout value, update lastWait (with ‘now’)

setup()[source]
startPoll(msg, member)[source]

Start a poll for a member based on message

threshold()[source]

Calculate the threshold for leader election, based on membership list

updatePeers(now)[source]

Update membership list (with the current time as the last time a peer was heard from)

updatePoll(msg)[source]

Update poll with the vote in msg

updateTimeout(now)[source]

Update the timeout value - used when a data message was received from the group; the timeout is updated to reflect elapsed time.

class riaps.run.dc.Poll(parent, rfv, member, timeout, deadline, numPeers)[source]

Bases: object

Poll object to represent an active poll in the group leader.

ACTION = 'action'
CONSENSUS = 'consensus'
MAJORITY = 'majority'
VALUE = 'value'
allVoted()[source]

Return True if all peers voted

expired(now)[source]

Return True if the voting has expired

result()[source]

Return True if the majority/all voted yes

vote(vote)[source]

Count one vote (yes/no)

riaps.run.dcPorts module

Distributed coordination - Communication ports for the groups.

Created on Feb 23, 2019 Author: riaps

class riaps.run.dcPorts.GroupAnsPort(parentPart, portName, groupSpec)[source]

Bases: BindPort, GroupDuplexPort

Group answer port is for the leader to receive messages from members. Based on a DEALER socket. Group-internal communication port for messaging with the leader, no message type, but can be timed.

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getPortNumber()[source]
getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

get_identity()[source]
inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recvFromMember()[source]
recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(_msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

sendToMember(msgType, msg)[source]
send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

set_identity(identity)[source]
setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.

update()[source]

Update the socket with information from the discovery service. Abstract, subclasses must implement this method.

Called when the discovery service notifies the actor about a new service provide (e.g. server, publisher, etc.) the port needs to connect to. The operation will perform the connection.

Parameters:
  • host (str) – IP address of the service provider

  • port (int) – port number of the service provider

updatePoller(poller)[source]
class riaps.run.dcPorts.GroupDuplexPort(parentComponent, portName, groupSpec)[source]

Bases: Port

class riaps.run.dcPorts.GroupPubPort(parentPart, portName, groupSpec)[source]

Bases: BindPort, GroupSimplexPort

Group Publisher port is for publishing application and housekeeping messages for all group members.

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

sendGroup(msgType, msg)[source]
send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.

update(host, port)[source]

Update the socket with information from the discovery service. Abstract, subclasses must implement this method.

Called when the discovery service notifies the actor about a new service provide (e.g. server, publisher, etc.) the port needs to connect to. The operation will perform the connection.

Parameters:
  • host (str) – IP address of the service provider

  • port (int) – port number of the service provider

class riaps.run.dcPorts.GroupQryPort(parentPart, portName, groupSpec)[source]

Bases: ConnPort, GroupDuplexPort

Group query port is for accessing the leader from members. Based on a DEALER socket. Group-internal communication port for messaging with the leader, no message type, but can be timed.

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve relevant information about this port

getSocket()[source]

Return the socket of port

inSocket()[source]

Return True because the socket is used of input

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recvFromLeader()[source]
recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(_msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

sendToLeader(msgType, msg)[source]
send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

setup()[source]

Set up the port

setupSocket(owner)[source]

Set up the socket of the port. Return a tuple suitable for querying the discovery service for the servers (not used currently).

update(host, port)[source]

Update the query port – connect its socket to a server

class riaps.run.dcPorts.GroupSimplexPort(parentComponent, portName, groupSpec)[source]

Bases: Port

class riaps.run.dcPorts.GroupSubPort(parentPart, portName, groupSpec)[source]

Bases: ConnPort, GroupSimplexPort

Group subscriber port is for receiving application and housekeeping messages from all group members.

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recvGroup()[source]
recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(_msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.

riaps.run.deplc module

Deployment manager client Created on Jan 3, 2017

@author: riaps

class riaps.run.deplc.DeplClient(parentActor, suffix)[source]

Bases: object

Deployment service client of an actor

registerActor()[source]
releaseDevice(bundle)[source]
reportEvent(bundle)[source]
requestDevice(bundle)[source]
start()[source]
terminate()[source]
riaps.run.device module
riaps.run.disco module

Created on Oct 19, 2016

@author: riaps

class riaps.run.disco.DiscoClient(parentActor, suffix)[source]

Bases: object

Discovery service client of an actor

handleLookupReq(bundle)[source]
handleRegReq(bundle)[source]
handleUnlookupReq(bundle)[source]
handleUnregReq(bundle)[source]
reconnect()[source]
recvFromDisco(loc, shut)[source]
registerActor()[source]
registerEndpoint(bundle)[source]
registerGroup(bundle)[source]
rpcDisco(msgBytes, loc, shut)[source]
sendToDisco(msgBytes, loc, shut)[source]
start()[source]
terminate()[source]
unregisterGroup(bundle)[source]
riaps.run.dmain module
riaps.run.exc module

Created on Oct 10, 2016

@author: riaps

exception riaps.run.exc.BuildError(message)[source]

Bases: RIAPSError

exception riaps.run.exc.ControlError(message)[source]

Bases: RIAPSError

exception riaps.run.exc.DatabaseError(message)[source]

Bases: RIAPSError

exception riaps.run.exc.OperationError(message)[source]

Bases: RIAPSError

exception riaps.run.exc.PortError(message, errno)[source]

Bases: RIAPSError

EAGAIN = 11
EFAULT = 14
EPROTO = 156384763
EROUTE = 113
ETERM = 156384765
exception riaps.run.exc.RIAPSError(message)[source]

Bases: Exception

exception riaps.run.exc.SetupError(message)[source]

Bases: RIAPSError

exception riaps.run.exc.StateError(message)[source]

Bases: RIAPSError

riaps.run.fsm module

Created on Apr 23, 2020

@author: riaps

class riaps.run.fsm.FSM(initial=None)[source]

Bases: Component

Finite-State Machine component base class.

class entry(state)[source]

Bases: object

class exit(state)[source]

Bases: object

fsmLock = <unlocked _thread.RLock object owner=0 count=0>
handleNoTransition(event)[source]
handleNondeterminism(event, state)[source]
handleUnhandledEvent(event, state)[source]
class on(event, state, guard=None, then=None)[source]

Bases: object

property state
riaps.run.insPort module

Created on Jan 9, 2017

@author: riaps

class riaps.run.insPort.InsPort(parentPart, portName, portSpec)[source]

Bases: Port

classdocs

activate()[source]

Activate the port object. Subclasses can override this method.

deactivate()[source]

Deactivate the port object. Subclasses can override this method.

getContext()[source]
getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

get_identity()[source]
get_plug_identity(plug)[source]
inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

ins_port_recv(is_pyobj)[source]
ins_port_send(msg, is_pyobj)[source]
recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

set_identity(identity)[source]
setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupPlug(thread)[source]
setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.

terminate()[source]

Terminate all activities of the port. Subclasses can override this method.

riaps.run.main module
riaps.run.part module

Part class Created on Oct 9, 2016

@author: run

class riaps.run.part.Part(parentActor, iTypeDef, iName, iTypeName, iArgs)[source]

Bases: object

Part class to encapsulate and manage component (and its thread)

class State(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Active = 3
Checkpointing = 4
Destroyed = 7
Inactive = 5
Initial = 1
Passive = 6
Ready = 2
Starting = 0
activate()[source]

Activate this part

buildAllPorts(portSpecs)[source]

Build all the ports of the part

buildPorts(res, key, ports, class_)[source]

Build the port objects of a kind of this part

checkpoint()[source]
deactivate()[source]
destroy()[source]
getActorID()[source]
getActorName()[source]
getAppName()[source]
getName()[source]
getTypeName()[source]
getUUID()[source]
handleCPULimit()[source]
handleMemLimit()[source]
handleNICStateChange(state)[source]
handleNetLimit()[source]
handlePeerStateChange(state, uuid)[source]
handlePortUpdate(portName, host, port)[source]

Handle a port update message coming from the discovery service

handleReinstate()[source]

Reinstate providers with a restarted disco

handleSpcLimit()[source]
load()[source]

Load the component implementation code

property mods
passivate()[source]
reactivate()[source]
sendControl(cmd, timeOut)[source]

Send a control message to component thread

setup(control_)[source]

Set up the part and change its state to Ready

setupPorts(ports)[source]

Set up all the ports of this part

terminate()[source]
riaps.run.peripheral module

Peripheral class - encapsulates a device, used in an app actor Created on Jan 6, 2017

@author: riaps

class riaps.run.peripheral.Peripheral(parentActor, iTypeDef, iName, iTypeName, iArgs)[source]

Bases: object

Peripheral class to encapsulate access to a device component Note: implements a public interface compatible with a part

class State(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Active = 3
Checkpointing = 4
Destroyed = 7
Inactive = 5
Initial = 1
Passive = 6
Ready = 2
Starting = 0
activate()[source]

Activate this peripheral

checkpoint()[source]
deactivate()[source]
destroy()[source]
getControl()[source]
handleCPULimit()[source]
handleMemLimit()[source]
handleNICStateChange(state)[source]
handleNetLimit()[source]
handlePeerStateChange(state, uuid)[source]
handlePortUpdate(_portName, _host, _port)[source]

Handle an update message coming from the devm service

handleReinstate()[source]
handleSpcLimit()[source]
property mods
passivate()[source]
reactivate()[source]
setup()[source]

Set up the peripheral and change its state to Ready

terminate()[source]
riaps.run.port module

Base class for all Port objects

class riaps.run.port.BindPort(parentComponent, portName, portSpec)[source]

Bases: Port

closeBindSocket()[source]
setupBindSocket(owner, zmqType, portKind, sockopts=[])[source]

Set up a bind socket

class riaps.run.port.ConnPort(parentComponent, portName, portSpec)[source]

Bases: Port

closeConnSocket()[source]
connected()[source]

Return the number of servers this port is connected to.

resetConnSocket(zmqType, sockopts=[])[source]

Reset a conn socket: remove and recreate

setupConnSocket(owner, zmqType, portKind, sockopts=[])[source]

Setup a conn socket

update(host, port)[source]

Update the client – connect its socket to a server

class riaps.run.port.DuplexBindPort(parentComponent, portName, portSpec)[source]

Bases: BindPort, DuplexPort

class riaps.run.port.DuplexConnPort(parentComponent, portName, portSpec)[source]

Bases: ConnPort, DuplexPort

class riaps.run.port.DuplexPort(parentComponent, portName, portSpec)[source]

Bases: Port

class riaps.run.port.Port(parentPart, portName, portSpec=None)[source]

Bases: object

Base class for all Port objects.

Port objects are used by a component to communicate with other components, in the same process, on the same host, or on the same network. Ports encapsulate low-level communication objects (zeromq sockets).

Parameters:
  • parentPart (Part) – the Part object that owns this port.

  • portName (str) – the name of the port (from the model)

activate()[source]

Activate the port object. Subclasses can override this method.

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

deactivate()[source]

Deactivate the port object. Subclasses can override this method.

getDeadline()[source]

Return the deadline parameter associated with the port’s operation

Returns:

Deadline for the operation associated with the port, in seconds.

Return type:

float

getGlobalIface()[source]

Return the IP address of the global network interface

The operation retrieves the result from the parent actor and caches it.

Returns:

Global IP address of the form xxx.xxx.xxx.xxx

Return type:

str

getIndex()[source]

Return the index of the port.

For input ports the index is a small integer indicating its position in the port list of the component, for non-input ports it is None. The index is used to determine the priority order for the port among all the ports, the concrete value is irrelevant.

Returns:

Index value for the port among all input ports.

Return type:

int

getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getLocalIface()[source]

Return the IP address of the local network interface (typically 127.0.0.1)

The operation retrieves the result from the parent actor and caches it.

Returns:

Local IP address of the form xxx.xxx.xxx.xxx

Return type:

str

getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

get_hwm()[source]

Retrieve the high-water mark for the socket.

get_recvTime()[source]

Return the timestamp taken at the last receive operation.

Returns:

time of the last message receive operation

Return type:

float

get_recv_timeout()[source]

Retrieve the receive timeout parameter for the port.

Receive timeout determines how long a receive operation will block before throwing a PortError.EAGAIN exception. None means infinite timeout.

Returns:

None (if no timeout is set) or the timeout value in seconds.

Return type:

None or float

get_sendTime()[source]

Return the timestamp of the sending time of the last message receive.

Returns:

time when the last message received was sent

Return type:

float

get_send_timeout()[source]

Retrieve the send timeout parameter for the port.

Send timeout determines how long a send operation will block before throwing a PortError.EAGAIN exception. None means infinite timeout.

Returns:

None (if no timeout is set) or the timeout value in seconds.

Return type:

None or float

inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

port_recv(is_pyobj)[source]

Lowest level message receiving operation. Subclasses can override this operation.

The message is received as a multi-part message.If the receiving port is timed, the current timestamp is saved as the time of message reception. If the message is to be received as a Python object, it is unpickled, otherwise the message is returned as is (as a bytes). If the message included a timestamp, it is retrieved and saved as the time of message sending.

Parameters:

is_pyobj (bool) – flag to indicate if the expected message is a Python data object.

Returns:

the message received

Except:

Throws a PortError exception when the underlying network operation fails.

port_send(msg, is_pyobj)[source]

Lowest level message sending operation. Subclasses can override this operation.

If the message is to be sent as a Python object, it is pickled; otherwise it is assumed to be a bytes. The message is packed into a frame, and, if the port is ‘timed’ a current timestamp is appended as another frame. The message is sent as a multi-part message.

Parameters:
  • msg (either a bytes or any Python data object) – message to be sent

  • is_pyobj (bool) – flag to indicate if the message is a Python object.

Returns:

True

Except:

Throws a PortError exception when the underlying network operation fails.

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recv_capnp()[source]

DEPRECATED. Receive an object (if possible) through the port

recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

send_capnp(msg)[source]

DEPRECATED. Send a byte array (if possible) out through the port

send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

setOwner(owner)[source]

Save owner thread into a data member.

Parameters:

owner (ComponentThread) – The ComponentThread the port is handled in.

set_recv_timeout(rto)[source]

Set the receive timeout for the port.

Receive timeout determines how long a receive operation will block before throwing a PortError.EAGAIN exception. None means infinite timeout.

Parameters:

rto (None or float) – None (if no timeout is set) or the timeout value in seconds.

set_send_timeout(sto)[source]

Set the send timeout for the port.

Send timeout determines how long a send operation will block before throwing a PortError.EAGAIN exception. None means infinite timeout.

Parameters:

sto (None or float) – None (if no timeout is set) or the timeout value in seconds.

set_sockoptions(sockopts)[source]
setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupCurve(server)[source]
setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.

terminate()[source]

Terminate all activities of the port. Subclasses can override this method.

update(host, port)[source]

Update the socket with information from the discovery service. Abstract, subclasses must implement this method.

Called when the discovery service notifies the actor about a new service provide (e.g. server, publisher, etc.) the port needs to connect to. The operation will perform the connection.

Parameters:
  • host (str) – IP address of the service provider

  • port (int) – port number of the service provider

class riaps.run.port.PortInfo(portKind, portScope, portName, msgType, portHost, portNum)

Bases: tuple

msgType

Alias for field number 3

portHost

Alias for field number 4

portKind

Alias for field number 0

portName

Alias for field number 2

portNum

Alias for field number 5

portScope

Alias for field number 1

class riaps.run.port.PortScope(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

GLOBAL = 1
INTERNAL = 3
LOCAL = 2
scope()[source]
class riaps.run.port.SimplexBindPort(parentComponent, portName, portSpec)[source]

Bases: BindPort, SimplexPort

class riaps.run.port.SimplexConnPort(parentComponent, portName, portSpec)[source]

Bases: ConnPort, SimplexPort

class riaps.run.port.SimplexPort(parentComponent, portName, portSpec)[source]

Bases: Port

riaps.run.pubPort module

Created on Oct 10, 2016

@author: riaps

class riaps.run.pubPort.PubPort(parentComponent, portName, portSpec)[source]

Bases: SimplexBindPort

Publisher port

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.

update(host, port)[source]

Update the socket with information from the discovery service. Abstract, subclasses must implement this method.

Called when the discovery service notifies the actor about a new service provide (e.g. server, publisher, etc.) the port needs to connect to. The operation will perform the connection.

Parameters:
  • host (str) – IP address of the service provider

  • port (int) – port number of the service provider

riaps.run.qryPort module

Query port class Created on Oct 10, 2016

@author: riaps

class riaps.run.qryPort.QryPort(parentComponent, portName, portSpec)[source]

Bases: DuplexConnPort

Query port is to access a server. Has a request and a response message type, and uses a DEALER socket.

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve relevant information about this port

getSocket()[source]

Return the socket of port

inSocket()[source]

Return True because the socket is used of input

recv()[source]

Receive a bytes through this port

recv_pyobj()[source]

Receive an object through this port

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(msg)[source]

Send bytes through this port

send_pyobj(msg)[source]

Send an object through this port

setup()[source]

Set up the port

setupSocket(owner)[source]

Set up the socket of the port. Return a tuple suitable for querying the discovery service for the publishers

riaps.run.repPort module

Created on Oct 10, 2016

@author: riaps

class riaps.run.repPort.RepPort(parentComponent, portName, portSpec)[source]

Bases: DuplexBindPort

Similar to a server port.

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.

update(host, port)[source]

Update the socket with information from the discovery service. Abstract, subclasses must implement this method.

Called when the discovery service notifies the actor about a new service provide (e.g. server, publisher, etc.) the port needs to connect to. The operation will perform the connection.

Parameters:
  • host (str) – IP address of the service provider

  • port (int) – port number of the service provider

riaps.run.reqPort module

Created on Oct 10, 2016

@author: riaps

class riaps.run.reqPort.ReqPort(parentComponent, portName, portSpec)[source]

Bases: DuplexConnPort

Similar to a client port

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.

riaps.run.srvPort module

Created on Oct 10, 2016

@author: riaps

class riaps.run.srvPort.SrvPort(parentComponent, portName, portSpec)[source]

Bases: DuplexBindPort

classdocs

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.

update(host, port)[source]

Update the socket with information from the discovery service. Abstract, subclasses must implement this method.

Called when the discovery service notifies the actor about a new service provide (e.g. server, publisher, etc.) the port needs to connect to. The operation will perform the connection.

Parameters:
  • host (str) – IP address of the service provider

  • port (int) – port number of the service provider

riaps.run.subPort module

Created on Oct 10, 2016

@author: riaps

class riaps.run.subPort.SubPort(parentComponent, portName, portSpec)[source]

Bases: SimplexConnPort

Subscriber port

closeSocket()[source]

Close down the port. Abstract, subclasses must implement this method.

getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

recv()[source]

Receive a byte array (if possible) through the port

Used for receiving a message that is subsequently deserialized.

Returns:

a message packed into a bytes.

Return type:

bytes

recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

send(_msg)[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.

riaps.run.timPort module

Created on Oct 10, 2016

@author: riaps

class riaps.run.timPort.TimPort(parentPart, portName, portSpec)[source]

Bases: Port

Timer port

activate()[source]

Activate the timer port

cancel()[source]

Cancel the sporadic timer

deactivate()[source]

Deactivate the timer port

getDelay()[source]

Get the current delay (for sporadic timer)

getInfo()[source]

Retrieve configuration information about the port. Abstract, subclasses must implement this method.

Returns:

a tuple containing the name of the port’s type: req,rep,clt,srv,qry,ans,pub,sub,ins,or tim; the kind of the port (global, local, internal); the name of the port object; the name of the message type; the host and the port number.

Return type:

PortInfo

getPeriod()[source]

Read the period of the periodic timer

getSocket()[source]

Return the socket(s) used by the port object. Abstract, subclasses must implement this method.

Returns:

a low-level socket

Return type:

zmq.Socket

halt()[source]

Halt the timer

inSocket()[source]

Return True if the socket can be used for input. Abstract, subclasses must implement this method.

Returns:

logical value indicating whether the socket is for input.

Type:

bool

launch()[source]

Launch (start) the sporadic timer

recv()[source]

Receive time stamp (a float) as a byte array

recv_pyobj()[source]

Receive a Python data object (if possible) through the port. Abstract, subclasses must implement this method.

The raw message received is deserialized using pickle and returned. Messages received this way had to be sent using the send_pyobj method.

Returns:

a Python data object

Type:

any Python data type

reset()[source]

Reset the port object. Subclasses can override this method.

Reset is to be used when a send or receive operation fails and the port needs to be re-initialized.

running()[source]

Returns True if the timer is running

send()[source]

Send a byte array (if possible) out through the port.

Used for sending a message that has been serialized into bytes previously.

Parameters:

msg – the message packed into a bytes

Return type:

bytes

send_pyobj(msg)[source]

Send a Python data object (if possible) out through the port. Abstract, subclasses must implement this method.

The object is serialized using pickle and sent. Messages sent using this method are received using the recv_pyobj method.

Parameters:

msg (any Python data type) – the message to be sent.

setDelay(_delay)[source]

Set the current delay (for sporadic timer)

setPeriod(_period)[source]

Set the period - will be changed after the next firing. Period must be positive

setup()[source]

Initialize the port object (after construction but before socket creation). Abstract, subclasses must implement this method.

setupSocket(owner)[source]

Setup the socket. Abstract, subclasses must implement this method.

Parameters:

owner (Component) – The Component the port belongs to. This operation must be called from the component thread only.

terminate()[source]

Terminate the timer

class riaps.run.timPort.TimerThread(parent)[source]

Bases: Thread

class Command(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

ACTIVATE = 2
CANCEL = 5
DEACTIVATE = 3
HALT = 6
START = 4
TERMINATE = 1
cmdError(where, cmd)[source]
getDelay()[source]

Get the current delay (for sporadic timer)

getPeriod()[source]

Read out the period

ready()[source]
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

running()[source]

Returns True if the timer is running

setDelay(_delay)[source]

Set the current delay (for sporadic timer)

setPeriod(_period)[source]

Set the period (for periodic timer). Takes effect after the next firing.

waitFor(timeout=None)[source]
Module contents

riaps.utils package

Submodules
riaps.utils.appdesc module

Application descriptor (used as a yaml object)

Created on Oct 29, 2018

@author: riaps

class riaps.utils.appdesc.AppDescriptor(url, host, mac, sha, home, hosts, network)[source]

Bases: YAMLObject

RIAPS app origin - ‘signature file url = URL of the repo (or local folder) the app is coming from host = host IP addares mac = MAC address of host sha = SHA of package home = local source folder (used in remote debugging) hosts = hosts participating in the app network = network access control for nodes, (ip|’[]’) => [] | [ (‘dns’ | ip) ]+

yaml_loader

alias of SafeLoader

riaps.utils.config module

Created on Nov 23, 2016

@author: riaps

class riaps.utils.config.Config[source]

Bases: object

Configuration database for RIAPS tools Including logging configuration

ACTOR_DEBUG_SERVER = ''
APP_LOGS = ''
CTRL_DEBUG_SERVER = ''
CTRL_HEARTBEAT = True
DEPLO_DEBUG_SERVER = ''
DEVICE_DEBUG_SERVER = ''
DISCO_DEBUG_SERVER = ''
DISCO_TYPE = 'redis'
NETMON = True
NIC_CEIL = '131kbps'
NIC_NAME = None
NIC_RATE = '118kbps'
NODE_HEARTBEAT = True
RECV_HWM = 100
RECV_TIMEOUT = -1
SECURITY = True
SEND_HWM = 100
SEND_TIMEOUT = -1
TARGET_USER = 'riaps'
riaps.utils.ctrlhost module
riaps.utils.gencert module

Script to generate a public/private key pair and a self-signed certificate for securing riaps communications. THE KEY ANND CERTIFICATE MUST NEVER BE USED IN FIELDED SYSTEMS. Keys and the certificate must be installed in the $RIAPSHOME/keys directory. Private key is NOT ENCRYPTED.

@author: riaps

riaps.utils.gencert.generate_keys(cert_dir)[source]
riaps.utils.gencert.generate_self_signed_cert(cert_dir, key)[source]
riaps.utils.gencert.generate_zmq_cert(cert_dir)[source]
riaps.utils.gencert.main()[source]
riaps.utils.ifaces module

Various network interface utility functions Created on Nov 4, 2016

@author: riaps

riaps.utils.ifaces.getNetworkInterfaces(nicName=None)[source]

Determine the IP address of the network interfaces Return a tuple of list of global IP addresses, list of MAC addresses, and local IP address If the requested interface is found the list will contain the information for that interface only.

riaps.utils.ifaces.get_random_port()[source]

Get a random open port

riaps.utils.ifaces.get_unix_dns_ips()[source]

Retrieve the IP address(es) of dns servers used by this host

riaps.utils.ifaces.is_valid_ipv4_address(address)[source]

Determine if the argument is a valid IP address

riaps.utils.names module

name operations Created on Jan 19, 2018

@author: riaps

riaps.utils.names.actorIdentity(appName, actorName, pid)[source]
riaps.utils.singleton module

Created on Jul 23, 2018

@author: riaps

riaps.utils.singleton.singleton(process_name, suffix=None)[source]

Enforce the caller process is a singleton

riaps.utils.spdlog_setup module

Created on Jan 3, 2019

@author: riaps

riaps.utils.spdlog_setup.add_basic_file_sink_mt(s)[source]
riaps.utils.spdlog_setup.add_basic_file_sink_st(s)[source]
riaps.utils.spdlog_setup.add_color_stdout_sink_mt(_s)[source]
riaps.utils.spdlog_setup.add_color_stdout_sink_st(_s)[source]
riaps.utils.spdlog_setup.add_daily_file_sink_mt(s)[source]
riaps.utils.spdlog_setup.add_daily_file_sink_st(s)[source]
riaps.utils.spdlog_setup.add_null_sink_mt(_s)[source]
riaps.utils.spdlog_setup.add_null_sink_st(_s)[source]
riaps.utils.spdlog_setup.add_parent_dir(s)[source]
riaps.utils.spdlog_setup.add_rotating_file_sink_mt(s)[source]
riaps.utils.spdlog_setup.add_rotating_file_sink_st(s)[source]
riaps.utils.spdlog_setup.add_stdout_sink_mt(_s)[source]
riaps.utils.spdlog_setup.add_stdout_sink_st(_s)[source]
riaps.utils.spdlog_setup.add_syslog_sink_mt(s)[source]
riaps.utils.spdlog_setup.add_syslog_sink_st(s)[source]
riaps.utils.spdlog_setup.add_tcp_sink_mt(s)[source]
riaps.utils.spdlog_setup.add_tcp_sink_st(s)[source]
riaps.utils.spdlog_setup.file_size(num)[source]
riaps.utils.spdlog_setup.from_file(fname)[source]
riaps.utils.spdlog_setup.get_logger(name)[source]
riaps.utils.sudo module

sudo operations Created on Jan 19, 2018

@author: riaps

riaps.utils.sudo.is_su()[source]
riaps.utils.sudo.riaps_sudo(cmd, timeout=None)[source]
riaps.utils.ticker module

Created on Jun 21, 2022

@author: riaps

class riaps.utils.ticker.Ticker(period, callback)[source]

Bases: Thread

Simple periodic ticker

cancel()[source]
run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

riaps.utils.trace module

Created on Dec 16, 2017

@author: riaps

riaps.utils.trace.riaps_trace(debug=None, prog=None)[source]

Setup trace mode and wait for the debug server.

@debug: Debug server control string of the form ‘hostname:portname’,

both of which are optional, ‘:’ defaulting to localhost:5678

@prog: Label for the program, looked up in the riaps configuration file.

First, it attempts to connect to the debug server using the debug argument (if present). Second, it tries to connect to the debug server using the information from the configuration file. If the config file argument is empty, it silently returns. Returns: True of False depending on whether the program is running in trace mode.

riaps.utils.trace.riaps_trace_setup(debug)[source]
Module contents

Submodules

riaps.riaps_actor module

riaps.riaps_ctrl module

riaps.riaps_depll module

Top-level script of the deployment language processor

Example:

riaps_depll model [-v | --verbose] [-g|--generate]

Arguments:
  • model: name of model file to be processed

  • -v|--verbose: print the resulting JSON file on the console

  • -g: generate a JSON file

riaps.riaps_deplo module

riaps.riaps_device module

riaps.riaps_disco module

riaps.riaps_fab module

Top level script to start fabric file for handling multiple RIAPS nodes setup

Created on March 6, 2019

Arguments:
  • fabcmd: fabric command desired

optional arguments: - -H | --hosts hostnames: list of hostnames (comma separated) - -R | --roles rolenames: list of roles (comma separated) - -f hostfilename : absolute path to local host file - -i ssh private key : relative or absolute path to specific private key

If specific hostnames are not given, the command will be called for all hosts listed in /usr/local/riaps/etc/riaps_hosts.conf

@author: riaps

riaps.riaps_fab.bash(cmd)[source]

riaps.riaps_gen module

Top-level script to start the language processor (‘lang’) for app models Created on Nov 15, 2018

Arguments:

-m, –model : Full path of the model.json. -o, –output : Output directory. Default is the directory of the model file. -cpp, –cpp : List of components to be generated in C++. -py, –python : List of components to be generated in Python. -s, –ser : Message serializer (‘capnp’ or ‘pickle’). -w, –overwrite : Overwrite existing code (no sync).

@author: riaps

riaps.riaps_gviz module

Top-level script to start the graphic visualization processor (‘gviz’)

Example:

riaps_gviz model deplo [-v|--verbose]

The script generates a .dot file shown the allocation of components and actors to target nodes based on the model and deployment files.

Arguments:

-model: name of application model file to be processed - deplo: name of deployment model file to be processed - -v|--verbose: prints the JSON produced from the deployment model

Output:
  • appname.dot : graphviz-style dot file for the name application (based on the model)

riaps.riaps_lang module

Top-level script to start the language processor (‘lang’) for app models

Example:

riaps_lang model [-v|--verbose]

The program analyzes the model file and generates a JSON file.

Arguments:
  • model : Name of model file to be processed

  • -v|--verbose: print the resulting JSON file on the console

riaps.riaps_logger module

riaps.riaps_mn module

Module contents

Indices and tables