riaps.run.dc module

Distributed coordination

Python implementation of group formation, communications, leader election, consensus and action coordination.

Created on Feb 23, 2019 Author: riaps

class riaps.run.dc.Coordinator(parent)[source]

Bases: object

Coordinator object. Each component has a coordinator object that creates the group objects for a component. A group instance can be created only once in a component, subsequent creations return the same group object

getGroup(groupType, groupInstance)[source]

Returns a group instance based on its name

groupName(groupType, groupInstance)[source]

Form a name for a group instance

joinGroup(thread, groupType, groupInstance, componentId, groupMinSize)[source]

Operation to create a group instance (with capacity) in a component. Returns the instance

leaveGroup(group)[source]

Operation to ‘leave’ a group by a component. The group will be deactivated / its threads stopped, and deleted..

class riaps.run.dc.Group(parent, thread, groupType, groupInstance, componentId, groupSpec, groupMinSize)[source]

Bases: object

Group object, represents one group instance that belongs to a component. Acts as the front-end for the GroupThread, channels messages to/from. Some of its methods are run in the GroupThread

GROUP_ACK = b'ack'
GROUP_ANN = b'ann'
GROUP_ERR = b'err'
GROUP_LEL = b'lel'
GROUP_LEX = b'lex'
GROUP_MFL = b'mfl'
GROUP_MJD = b'mjd'
GROUP_MLT = b'mlt'
GROUP_MSG = b'msg'
GROUP_MTL = b'mtl'
GROUP_NLD = b'nld'
GROUP_PARAMETERS = {'consensusTimeout': 1500, 'electionMax': 2000, 'electionMin': 1500, 'heartbeat': 1000, 'peerTimeout': 3000}
GROUP_RCM = b'rcm'
GROUP_RFV = b'rfv'
GROUP_RTC = b'rtc'
GROUP_UPD = b'upd'
getGroupId()[source]
getGroupName()[source]

Group unique name

getLeaderId()[source]

Return the leader’s id (or None if no leader)

getSocket()[source]

Returns inproc socket used to communicate with GroupThread

groupSize()[source]

Return the size of the group (>= 1).

groupSocketName(groupType, groupName, componentId)[source]

Forms the unique name for the inproc socket.

handleMessage(msgFrames=None)[source]

Receives message from the worker thread and handles it Runs in component thread (called from component polling loop, or from send_port)

hasLeader()[source]

True if the group has a leader.

isLeader()[source]

Return True if the group member IS the leader.

leave()[source]
recv()[source]

Receive a bytes object message from the worker thread Runs in component thread, called from a component

recv_msg(is_pyobj)[source]

Receive a message from the worker thread. Message are coming through a message queue. Runs in component thread, called from a component

recv_pyobj()[source]

Receive a Python object message from the worker thread Runs in component thread, called from a component

requestActionVote(action, when, kind='consensus', timeout=None)[source]
requestActionVote_pyobj(action, when, kind='consensus', timeout=None)[source]
requestVote(topic, kind='majority', timeout=None)[source]

Request a vote on a topic (with timeout). Topic is a bytes. A message is sent to the leader (if any) that starts a voting process. Returns None if there is no leader, otherwise a generated id string for the request.

requestVote_pyobj(topic, kind='majority', timeout=None)[source]

Request a vote on a topic (with timeout). Topic is a Python object. A message is sent to the leader (if any) that starts a voting process. Returns None if there is no leader, otherwise a generated id string for the request.

send(msg)[source]

Sends a bytes object as a message to all members of the group Runs in component thread

sendActionVote(rfvId, vote)[source]
sendToLeader(msg)[source]

Send message to group leader from a member Raise an exception if the group is not coordinated Return False (indicating operation failure) if no leader Runs in component thread.

sendToLeader_pyobj(msg)[source]

Send PyObject message to group leader from a member Raise an exception if the group is not coordinated Return False (indicating operation failure) if no leader Runs in component thread.

sendToMember(msg, identity=None)[source]

Send message to group member (with identity) from the leader If identity is not supplied, last value of identity is used. Raise an exception if the group is not coordinated Return False (indicating operation failure) if no leader Runs in component thread.

sendToMember_pyobj(msg, identity=None)[source]

Send PyObject message to group member (with identity) from the leader If identity is not supplied, last value of identity is used. Raise an exception if the group is not coordinated Return False (indicating operation failure) if no leader Runs in component thread.

sendVote(rfvId, vote)[source]

Send a vote (True/False) to the leader on a requested topic identified by the id of the request for vote.

send_port(msgType, msg, has_identity=False)[source]

Send a message to the worker thread. Used by all messages - the messages have multiple frames Runs in component thread

send_pyobj(msg)[source]

Sends a Python object as a message to all members of the group Runs in component thread

setup(groupThread)[source]

Set up all the sockets for the worker thread Runs in group thread

setupParams()[source]
unsetup(groupThread)[source]

Discard all the sockets used in worker thread Runs in group thread

update(host, port)[source]

Ask the worker thread to update its sockets. Called when the disco responds with the server (pub) host/port pair for the socket(s) to connect to by the client (sub) Runs in component thread

class riaps.run.dc.GroupThread(group)[source]

Bases: Thread

Worker thread for DC behavior

AUTHORITY = b'ldr'
CANDIDATE = 2
FOLLOWER = 1
HEARTBEAT = b'tic'
LEADER = 3
NO_LEADER = 0
REQVOTE = b'req'
RSPVOTE = b'vot'
announceConsensus(rfvId, vote)[source]

Announce consensus vote result (yes/no/timeout)

checkAllPolls(now)[source]

Check the results of all polls

checkPoll(poll, now)[source]

Check the result of of one poll

electionTimeout()[source]

Produce a random election timeout

getOwnId()[source]
handleCompMessage()[source]

Handle a message coming from the component

handleMessageForLeader()[source]

Handle message sent to the leader (in leader)

handleMessageForMember()[source]

Handle simple message from leader (in member)

handleNetMessage(now)[source]

Handle (broadcast) messages coming from the group. Messages could be data messages (to be handed over to the component), peer heartbeat messages, or election-related messages

handleTimeout(now)[source]

Handle the timeout on communications within the group

heartbeat(now)[source]

Send out a group heartbeat (so that group members can maintain an accurate membership list)

isLeader()[source]
run()[source]

Main loop for GroupThread - polls all sources and calls handlers

sendChangeMessage(change, someId)[source]

Send message about a change to the component

setLeaderDeadline()[source]

Set deadline for heartbeat from leader

setTimeout(value, now)[source]

Set the next timeout value, update lastWait (with ‘now’)

setup()[source]
startPoll(msg, member)[source]

Start a poll for a member based on message

threshold()[source]

Calculate the threshold for leader election, based on membership list

updatePeers(now)[source]

Update membership list (with the current time as the last time a peer was heard from)

updatePoll(msg)[source]

Update poll with the vote in msg

updateTimeout(now)[source]

Update the timeout value - used when a data message was received from the group; the timeout is updated to reflect elapsed time.

class riaps.run.dc.Poll(parent, rfv, member, timeout, deadline, numPeers)[source]

Bases: object

Poll object to represent an active poll in the group leader.

ACTION = 'action'
CONSENSUS = 'consensus'
MAJORITY = 'majority'
VALUE = 'value'
allVoted()[source]

Return True if all peers voted

expired(now)[source]

Return True if the voting has expired

result()[source]

Return True if the majority/all voted yes

vote(vote)[source]

Count one vote (yes/no)