Source code for riaps.run.dc

'''
Distributed coordination

Python implementation of group formation, communications, leader election, consensus and action coordination. 

Created on Feb 23, 2019
Author: riaps
'''

import time
import logging
import struct
import collections
import traceback
import random
import string
import ipaddress
# import ctypes
import threading
import zmq
from zmq.backend.cython.utils import ZMQError
# from riaps.run.port import Port
from riaps.consts.defs import *
# from riaps.utils import spdlog_setup
# import spdlog
from riaps.run.exc import BuildError, OperationError, PortError
from riaps.run.dcPorts import GroupPubPort, GroupSubPort, GroupQryPort, GroupAnsPort
import capnp
from riaps.run import dc_capnp

try:
    import cPickle
    pickle = cPickle
except:
    cPickle = None
    import pickle


[docs]class Poll(object): ''' Poll object to represent an active poll in the group leader. ''' # Voting type MAJORITY = 'majority' # Must match dc_capnp spec CONSENSUS = 'consensus' # Subject type VALUE = 'value' # Must match dc_capnp spec ACTION = 'action' def __init__(self, parent, rfv, member, timeout, deadline, numPeers): self.parent = parent self.topic = rfv.topic self.rfvId = rfv.rfvId self.kind = rfv.kind self.subject = rfv.subject self.release = rfv.release self.started = rfv.started self.timeout = timeout self.deadline = deadline self.member = member self.numPeers = numPeers self.threshold = int(numPeers // 2 + 1) self.voteCnt = 0 self.yesCnt = 0
[docs] def vote(self, vote): ''' Count one vote (yes/no) ''' self.voteCnt += 1 if vote: self.yesCnt += 1
[docs] def expired(self, now): ''' Return True if the voting has expired ''' return (now > self.deadline)
[docs] def allVoted(self): ''' Return True if all peers voted ''' return (self.voteCnt == self.numPeers)
[docs] def result(self): ''' Return True if the majority/all voted yes ''' if self.kind == 'majority': return (self.yesCnt >= self.threshold) elif self.kind == 'consensus': return (self.yesCnt == self.numPeers)
[docs]class GroupThread(threading.Thread): ''' Worker thread for DC behavior ''' # Group/component states (RAFT) FOLLOWER = 1 CANDIDATE = 2 LEADER = 3 # Coordination messages, with content HEARTBEAT = 'tic'.encode('utf-8') # { ownId } REQVOTE = 'req'.encode('utf-8') # { term ; ownId } RSPVOTE = 'vot'.encode('utf-8') # { term; candId; bool; ownId } AUTHORITY = 'ldr'.encode('utf-8') # { term; ldrId; ldrHost; ldrPort } # Special port number indicating no leader NO_LEADER = 0 def __init__(self, group): threading.Thread.__init__(self,daemon=False) self.logger = logging.getLogger(__name__) self.group = group self.coordinated = False if self.group.kind == "default" else True # Assumption: we use IPv4 addresses - converted to ulong for the wire protocol self.host = ipaddress.IPv4Address(self.group.parent.parent.owner.parent.getGlobalIface()) self.ldrPort = None self.groupMinSize = group.groupMinSize self.groupHeartbeat = group.heartbeat # const.groupHeartbeat self.groupElectionMin = group.electionMin # const.groupElectionMin self.groupElectionMax = group.electionMax # const.groupElectionMax self.groupPeerTimeout = group.peerTimeout # const.groupPeerTimeout self.groupConsensusTimeout = group.consensusTimeout # const.groupConsensusTimeout self.timeout = None self.leaderDeadline = None # deadline to recv an expected heartbeat from leader self.peers = {} self.numPeers = len(self.peers) self.leader = None self.ownId = None
[docs] def setup(self): self.group.setup(self) actorId = self.group.parent.parent.owner.parent.getActorID() # Assumption: id can be represented on 8 bytes self.ownId = actorId + id(self.group).to_bytes(8, 'big') assert(len(self.ownId) == 16) if self.coordinated: # self.peers = {} # self.numPeers = len(self.peers) self.timeout = self.groupHeartbeat self.setLeaderDeadline() self.lastWait = 0 self.lastHeartbeat = 0 self.term = 0 self.votes = 0 self.votedFor = None self.leader = None self.polls = { }
[docs] def getOwnId(self): return self.ownId
[docs] def isLeader(self): return self.leader == self.ownId
[docs] def setLeaderDeadline(self): ''' Set deadline for heartbeat from leader ''' timeout = random.randrange(self.groupElectionMax, self.groupPeerTimeout) self.leaderDeadline = time.time() + timeout/1000.0
[docs] def electionTimeout(self): ''' Produce a random election timeout ''' return random.randrange(self.groupElectionMin, self.groupElectionMax)
[docs] def threshold(self): ''' Calculate the threshold for leader election, based on membership list ''' return int(self.numPeers // 2 + 1) # Threshold for votes for leader election
[docs] def setTimeout(self, value, now): ''' Set the next timeout value, update lastWait (with 'now') ''' assert(type(value) == int) self.timeout = value self.lastWait = now
[docs] def heartbeat(self, now): ''' Send out a group heartbeat (so that group members can maintain an accurate membership list) ''' if now > (self.lastHeartbeat + self.groupHeartbeat / 1000.0): msg = struct.pack('!16s', self.ownId) self.pubPort.sendGroup(GroupThread.HEARTBEAT, msg) self.lastHeartbeat = now
[docs] def sendChangeMessage(self, change, someId): ''' Send message about a change to the component ''' msgOut = [zmq.Frame(change), zmq.Frame(someId)] self.groupSocket.send_multipart(msgOut)
[docs] def updatePeers(self, now): ''' Update membership list (with the current time as the last time a peer was heard from) ''' leavers = [p for p in self.peers if int((now - self.peers[p]) * 1000) > self.groupPeerTimeout] if len(leavers) > 0: for leaver in leavers: self.sendChangeMessage(Group.GROUP_MLT, leaver) # Member dropped out del self.peers[leaver] self.numPeers = len(self.peers)
[docs] def updateTimeout(self, now): ''' Update the timeout value - used when a data message was received from the group; the timeout is updated to reflect elapsed time. ''' elapsed = now - self.lastWait remains = max(self.timeout - int(elapsed * 1000), 1) self.lastWait = now assert(type(remains) == int) self.timeout = remains
[docs] def handleCompMessage(self): ''' Handle a message coming from the component ''' toStop = False msgFrames = self.groupSocket.recv_multipart() cmd = msgFrames[0] if cmd == Group.GROUP_UPD: # Update message for sub port host, port = msgFrames[1].decode(), struct.unpack("d", msgFrames[2])[0] self.logger.info("GroupThread.handleCompMessage(GROUP_UPD,%s,%d)", host, port) self.subPort.update(host, port) elif cmd == Group.GROUP_MSG: # Generic message to be published to the group self.logger.info("GroupThread.handleCompMessage(GROUP_MSG,...)") try: self.pubPort.sendGroup(Group.GROUP_MSG, msgFrames[1]) self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_ACK)]) except zmq.error.ZMQError as e: self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_ERR), zmq.Frame(pickle.dumps(e))]) self.logger.error("error sending GROUP_MSG:%s", str(e)) elif cmd == Group.GROUP_MTL: # Message for the leader (from component) self.logger.info("GroupThread.handleCompMessage(GROUP_MTL,...)") if self.leader == None: # Error: no leader self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_NLD)]) else: try: if self.isLeader(): # We are the leader self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_ACK)]) msg = msgFrames[1] # Forward it to component msgOut = [zmq.Frame(Group.GROUP_MTL), zmq.Frame(msg)] msgOut += [zmq.Frame(bytes(0))] # Special case: 0 == own identity if self.group.isTimed: now = time.time() self.recvTime = now self.sendTime = now msgOut += [zmq.Frame(self.recvTime)] msgOut += [zmq.Frame(self.sendTime)] self.groupSocket.send_multipart(msgOut) else: self.qryPort.sendToLeader(Group.GROUP_MTL, msgFrames[1]) self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_ACK)]) except zmq.error.ZMQError as e: self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_ERR), zmq.Frame(pickle.dumps(e))]) self.logger.error("error sending GROUP_MTL:%s", str(e)) elif cmd == Group.GROUP_MFL: # Message from leader (for a component) self.logger.info("GroupThread.handleCompMessage(GROUP_MFL,...)") if self.leader == None: # Error: no leader self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_NLD)]) else: try: if self.isLeader and msgFrames[2] == bytes(0): # We are the leader and we sent the message self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_ACK)]) msg = msgFrames[1] # Forward it to component msgOut = [zmq.Frame(Group.GROUP_MFL), zmq.Frame(msg)] if self.group.isTimed: now = time.time() self.recvTime = now self.sendTime = now msgOut += [zmq.Frame(self.recvTime)] msgOut += [zmq.Frame(self.sendTime)] self.groupSocket.send_multipart(msgOut) else: self.ansPort.set_identity(msgFrames[2]) self.ansPort.sendToMember(Group.GROUP_MFL, msgFrames[1]) self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_ACK)]) except zmq.error.ZMQError as e: self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_ERR), zmq.Frame(pickle.dumps(e))]) self.logger.error("error sending GROUP_MFL:%s", str(e)) elif cmd == Group.GROUP_RFV: # Request for vote (through leader) self.logger.info("GroupThread.handleCompMessage(GROUP_RFV,...)") if self.leader == None: # Error: no leader self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_NLD)]) else: try: if self.isLeader(): # We are the leader self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_ACK)]) msg = msgFrames[1] identity = bytes(0) # Self identity ok = self.startPoll(msg, identity) # Start the poll if (ok): self.pubPort.sendGroup(Group.GROUP_RCM, msg) # Send RCM to group else: with dc_capnp.GroupVote.from_bytes(msg) as rfv: # Poll failed (before it got started) which = rfv.which() assert(which == 'rfv') rfvId = rfv.rfv.rfvId self.announceConsensus(rfvId, 'timeout') else: self.qryPort.sendToLeader(Group.GROUP_RFV, msgFrames[1]) self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_ACK)]) except zmq.error.ZMQError as e: self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_ERR), zmq.Frame(pickle.dumps(e))]) self.logger.error("error sending GROUP_RFV:%s", str(e)) elif cmd == Group.GROUP_RTC: # Reply to consensus request self.logger.info("GroupThread.handleCompMessage(GROUP_RTC,...)") if self.leader == None: # Error: no leader self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_NLD)]) else: try: if self.isLeader(): # We are the leader and we sent the vote self.updatePoll(msgFrames[1]) else: self.qryPort.sendToLeader(Group.GROUP_RTC, msgFrames[1]) self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_ACK)]) except zmq.error.ZMQError as e: self.groupSocket.send_multipart([zmq.Frame(Group.GROUP_ERR), zmq.Frame(pickle.dumps(e))]) self.logger.error("error sending GROUP_RTC:%s", str(e)) elif cmd == Group.GROUP_MLT: # Our component is leaving the group self.logger.info(f"GroupThread.handleCompMessage(GROUP_MLT,{self.ownId.hex()})") self.pubPort.sendGroup(Group.GROUP_MLT,zmq.Frame(self.ownId)) toStop = True else: self.logger.error("GroupThread.handleCompMessage() - unknown message type: %s", str(cmd)) return toStop
[docs] def handleTimeout(self, now): ''' Handle the timeout on communications within the group ''' self.logger.info("GroupThread.handleTimeout()") assert(self.coordinated) # self.updatePeers(now) self.checkAllPolls(now) if self.state == GroupThread.FOLLOWER: if self.leaderDeadline and now > self.leaderDeadline: # Past leader heartbeat deadline (if set) if self.numPeers > int(self.groupMinSize // 2) + 1: # To start an election, we need quorum self.logger.info(f"... FOLLOWER --> CANDIDATE, starting leader election in group of {self.numPeers}") if self.leader != None: self.logger.info("... past leader %s deadline" % self.leader.hex()) self.sendChangeMessage(Group.GROUP_LEX, self.leader) # Leader exited self.term += 1 # increment term self.state = GroupThread.CANDIDATE # candidate state self.votes = 1 # self-vote self.votedFor = self.ownId # voted for ourselves self.qryPort.update(None, None) # Disconnect query port msg = struct.pack('!L16s', self.term, self.ownId) self.pubPort.sendGroup(GroupThread.REQVOTE, msg) # request votes self.setTimeout(self.electionTimeout(), now) # Timeout for votes to come in self.leaderDeadline = None # No leader heartbeat deadline, we are in election mode self.leader = None else: self.logger.info(f"... FOLLOWER, no majority in group of {self.numPeers} to start election") self.setLeaderDeadline() # Update leader heartbeat deadline self.setTimeout(self.groupHeartbeat,now) else: self.setTimeout(self.groupHeartbeat,now) elif self.state == GroupThread.CANDIDATE: # We are beyond the current election timeout then self.logger.info("... CANDIDATE --> FOLLOWER, timeout on leader election") self.state = GroupThread.FOLLOWER # follower state self.votedFor = None if self.leader != None: self.logger.info("... previous leader %s ?" % self.leader.hex()) self.sendChangeMessage(Group.GROUP_LEX, self.leader) # Leader exited self.leader = None self.setTimeout(self.groupHeartbeat, now) self.setLeaderDeadline() # Update leader heartbeat deadline elif self.state == GroupThread.LEADER: # Leader sends message to inform members: term,id, host, port self.logger.info("... LEADER, sending out leadership note") assert(self.leader == self.ownId) msg = struct.pack('!L16sLL', self.term, self.ownId, \ int(self.host), \ int(self.ldrPort)) self.pubPort.sendGroup(GroupThread.AUTHORITY, msg) # Send out leader heartbeat self.setTimeout(self.groupHeartbeat, now) self.leaderDeadline = None # We are the leader, we don't have a deadline else: self.logger.error("GroupThread.handleTimeout() - in undefined state") pass # Error : timeout in unknown state")
[docs] def handleNetMessage(self, now): ''' Handle (broadcast) messages coming from the group. Messages could be data messages (to be handed over to the component), peer heartbeat messages, or election-related messages ''' self.logger.info("GroupThread.handleNetMessage...") msgFrames = self.subPort.recvGroup() cmd = msgFrames[0] if cmd == Group.GROUP_MSG: self.logger.info("... handle GROUP_MSG") msg = msgFrames[1] msgOut = [zmq.Frame(Group.GROUP_MSG), zmq.Frame(msg)] if self.group.isTimed: self.recvTime = self.subPort.recvTime() self.sendTime = self.subPort.sendTime msgOut += [zmq.Frame(self.recvTime)] msgOut += [zmq.Frame(self.sendTime)] self.groupSocket.send_multipart(msgOut) if self.coordinated: self.updateTimeout(now) elif cmd == Group.GROUP_MLT: # One of our peers left the group frame = msgFrames[1] leaver = struct.unpack('!16s', frame)[0] if leaver in self.peers: self.logger.info(f"GroupThread.handleNetMessage(GROUP_MLT,{leaver.hex()})") self.sendChangeMessage(Group.GROUP_MLT,leaver) del self.peers[leaver] else: self.logger.warning(f"GroupThread.handleNetMessage(GROUP_MLT,{leaver.hex()}) - not peer") self.numPeers = len(self.peers) elif self.coordinated: # Non-data messages are meaningful only for coordinated groups frame = msgFrames[1] if cmd != GroupThread.HEARTBEAT: self.logger.info("... [%s].%d:%s", \ self.group.getGroupName(), self.state, cmd.decode('utf-8')) if cmd == GroupThread.HEARTBEAT: # Incoming peer heartbeat message try: peer = struct.unpack('!16s', frame)[0] except: self.logger.error(f"... invalid peer in heartbeat: {str(frame)}") return if peer not in self.peers: self.logger.info(f"... peer {peer.hex()} added") self.sendChangeMessage(Group.GROUP_MJD, peer) self.peers[peer] = now self.numPeers = len(self.peers) self.updateTimeout(now) return if cmd == Group.GROUP_RCM: # Incoming group RCM message (from leader to a member) self.logger.info("... handle GROUP_RCM") msg = msgFrames[1] msgOut = [zmq.Frame(Group.GROUP_RCM), zmq.Frame(msg)] if self.group.isTimed: self.recvTime = self.subPort.recvTime() self.sendTime = self.subPort.sendTime msgOut += [zmq.Frame(self.recvTime)] msgOut += [zmq.Frame(self.sendTime)] self.groupSocket.send_multipart(msgOut) # forward it to component self.updateTimeout(now) self.setLeaderDeadline() # update leader heartbeat deadline return if cmd == Group.GROUP_ANN: # Incoming announcement (from leader) self.logger.info("... handle GROUP_ANN") msg = msgFrames[1] msgOut = [zmq.Frame(Group.GROUP_ANN), zmq.Frame(msg)] if self.group.isTimed: self.recvTime = self.subPort.recvTime() self.sendTime = self.subPort.sendTime msgOut += [zmq.Frame(self.recvTime)] msgOut += [zmq.Frame(self.sendTime)] self.groupSocket.send_multipart(msgOut) # forward it to component self.updateTimeout(now) self.setLeaderDeadline() # update leader heartbeat deadline return # Everything else is election-related if self.state == GroupThread.FOLLOWER: # We are a FOLLOWER if cmd == GroupThread.AUTHORITY: # Message from leader self.logger.info('... leader asserts authority') (_term, _leader, _host, _port) = struct.unpack('!L16sLL', frame) if self.term >= _term and self.leader and self.leader != _leader: self.logger.info("GroupThread[%s] - leader has changed", self.group.getGroupName()) self.term = _term # update term/leader prevLeader = self.leader self.leader = _leader if prevLeader != _leader: # If leader has changed self.sendChangeMessage(Group.GROUP_LEL, self.leader) self.votedFor = None # clear last vote self.setTimeout(self.electionTimeout(), now) # set timeout to [electionMin,electionMax] < groupHeartbeat self.setLeaderDeadline() # update leader heartbeat deadline if _port == GroupThread.NO_LEADER: _host, _port = None, None # Will this ever happen? else: _host = ipaddress.IPv4Address(_host).compressed _port = int(_port) self.qryPort.update(_host, _port) # Update query port to new leader elif cmd == GroupThread.REQVOTE: # Message to request a vote self.logger.info('... follower vote requested') (_term, _node) = struct.unpack('!L16s', frame) vote = None if self.votedFor == None: # Not voted yet vote = True # vote yes else: # Already voted if _term > self.term: # Newer term vote = True else: # Current term if _node != self.votedFor: # different leader vote = False # vote no else: # same leader vote = True # vote yes self.logger.info('... voting for leader %s for term %s with %s', _node.hex(), str(_term), str(vote)) rsp = struct.pack("!L16sL16s", _term, _node, int(vote), self.ownId) self.pubPort.sendGroup(GroupThread.RSPVOTE, rsp) self.term = _term if self.leader != None: self.sendChangeMessage(Group.GROUP_LEX, self.leader) self.leaderDeadline = None # no leader, we don't expect a heartbeat self.leader = None self.qryPort.update(None, None) # disconnect from previous leader (if at all) if vote: self.votedFor = _node self.setTimeout(self.electionTimeout(), now) # rand([eMin,eMax]) < gHB elif cmd == GroupThread.RSPVOTE: # Somebody's vote self.setTimeout(self.electionTimeout(), now) # rand([eMin,eMax]) < gHB elif self.state == GroupThread.CANDIDATE: # We are a candidate if cmd == GroupThread.REQVOTE: # ... requested to vote self.logger.info("... candidate requested to vote") (_term, _node) = struct.unpack('!L16s', frame) if _term == self.term: # For current term if _node != self.ownId: # Another candidate -> vote no rsp = struct.pack("!L16sL16s", _term, _node, int(False), self.ownId) self.pubPort.sendGroup(GroupThread.RSPVOTE, rsp) else: pass # We already 'voted' for ourselves elif _term > self.term: # For a future (next) term self.state = GroupThread.FOLLOWER # Go back to follower # Vote yes rsp = struct.pack("!L16sL16s", _term, _node, int(True), self.ownId) self.pubPort.sendGroup(GroupThread.RSPVOTE, rsp) self.term = _term if self.leader != None: self.sendChangeMessage(Group.GROUP_LEX, self.leader) # Leader exited self.leader = None self.qryPort.update(None, None) # disconnect from previous leader (if at all) self.votedFor = _node else: # For past term - ignore pass self.setTimeout(self.electionTimeout(), now) # rand([eMin,eMax]) < gHB elif cmd == GroupThread.RSPVOTE: # ... response to a vote request self.logger.info("... candidate received vote") (_term, _leader, _vote, _node) = struct.unpack("!L16sL16s", frame) if bool(_vote) and _leader == self.ownId: # it is a vote for us self.votes += 1 if self.votes >= self.threshold(): # we won self.logger.info("... candidate won election") self.state = GroupThread.LEADER self.leader = self.ownId self.sendChangeMessage(Group.GROUP_LEL, self.leader) # Tell ourselves that we are elected self.ansPort.updatePoller(self.poller) # Update ans socket/poller self.ansSocket = self.ansPort.getSocket() self.ldrPort = self.ansPort.getPortNumber() msg = struct.pack('!L16sLL', self.term, self.ownId, \ int(self.host), \ int(self.ldrPort)) self.pubPort.sendGroup(GroupThread.AUTHORITY, msg) self.leaderDeadline = None # We are the leader, no deadline self.setTimeout(self.groupHeartbeat, now) elif cmd == GroupThread.AUTHORITY: # ... leader asserted authority self.logger.info("... candidate received message from leader") (_term, _leader, _host, _port) = struct.unpack('!L16sLL', frame) self.state = GroupThread.FOLLOWER # we lost, go back to FOLLOWER self.term = _term # update term/leader prevLeader = self.leader self.leader = _leader if prevLeader != _leader: self.sendChangeMessage(Group.GROUP_LEL, self.leader) self.votedFor = None # clear last vote _host = ipaddress.IPv4Address(_host).compressed _port = int(_port) self.qryPort.update(_host, _port) # connect qry port to leader self.setTimeout(self.electionTimeout(), now) # rand([eMin,eMax]) < gHB self.setLeaderDeadline() elif self.state == GroupThread.LEADER: # leader received a message self.logger.info("... leader received message") timeout = self.groupHeartbeat if cmd == GroupThread.REQVOTE: # outstanding request pass elif cmd == GroupThread.RSPVOTE: # late vote pass elif cmd == GroupThread.AUTHORITY: (_term, _leader, _host, _port) = struct.unpack('!L16sLL', frame) if self.term == _term and self.ownId != _leader: self.logger.error("GroupThread[%s].%r - leader conflict with %r", self.group.getGroupName(),self.ownId.hex(),_leader.hex()) # Accept 'other' leader... self.state = GroupThread.FOLLOWER # we lost, go back to FOLLOWER self.term = _term # update term/leader prevLeader = self.leader self.leader = _leader if prevLeader != _leader: self.sendChangeMessage(Group.GROUP_LEL, self.leader) self.votedFor = None # clear last vote _host,_port = ipaddress.IPv4Address(_host).compressed, int(_port) self.qryPort.update(_host, _port) # connect qry port to leader timeout = self.electionTimeout() # rand([eMin,eMax]) < gHB self.setLeaderDeadline() # set leader heartbeat deadline self.setTimeout(timeout, now) else: self.logger.error("GroupThread[%s] - not coordinated", self.group.getGroupName())
[docs] def startPoll(self, msg, member): ''' Start a poll for a member based on message ''' self.logger.info("GroupThread.startPoll()") with dc_capnp.GroupVote.from_bytes(msg) as rfv: which = rfv.which() if which == 'rfv': now = time.time() rfvId = rfv.rfv.rfvId started = rfv.rfv.started timeout = rfv.rfv.timeout if timeout == 0.0: timeout = self.groupConsensusTimeout delta = now - started if delta > timeout: # We are past the timeout return False deadline = started + timeout - delta # Compensate for initial delay poll = Poll(self, rfv.rfv, member, timeout, deadline, self.numPeers) self.polls[rfvId] = poll return True else: self.logger.error('GroupThread.startPoll(): invalid message type %s', str(which)) return False
[docs] def announceConsensus(self, rfvId, vote): ''' Announce consensus vote result (yes/no/timeout) ''' self.logger.info("GroupThread.announceConsensus()") msg = dc_capnp.GroupVote.new_message() ann = msg.init('ann') ann.rfvId = rfvId ann.vote = vote # Result (yes/no) msgBytes = msg.to_bytes() self.pubPort.sendGroup(Group.GROUP_ANN, msgBytes) # Announce result
[docs] def checkPoll(self, poll, now): ''' Check the result of of one poll ''' self.logger.info("GroupThread.checkPoll()") res = False if poll.expired(now): # Poll has expired self.logger.info("... voting timeout") self.announceConsensus(poll.rfvId, 'timeout') res = True elif poll.allVoted(): # All voted self.logger.info("... all have voted") self.announceConsensus(poll.rfvId, 'yes' if poll.result() else 'no') res = True else: self.logger.info("... not all have voted: %d vs. %d", poll.voteCnt, poll.numPeers) pass return res
[docs] def checkAllPolls(self, now): ''' Check the results of all polls ''' self.logger.info("GroupThread.checkPoll()") toRemove = [] for p in self.polls: poll = self.polls[p] done = self.checkPoll(poll, now) if done: toRemove += [p] for r in toRemove: del self.polls[r]
[docs] def updatePoll(self, msg): ''' Update poll with the vote in msg ''' self.logger.info("GroupThread.updatePoll()") with dc_capnp.GroupVote.from_bytes(msg) as rtc: which = rtc.which() if which == 'rtc': rfvId = rtc.rtc.rfvId if rfvId in self.polls: poll = self.polls[rfvId] now = time.time() vote = True if rtc.rtc.vote == 'yes' else False poll.vote(vote) done = self.checkPoll(poll, now) if done: del self.polls[rfvId] else: self.logger.info("... rpfId is not in polls") pass
[docs] def handleMessageForLeader(self): ''' Handle message sent to the leader (in leader) ''' self.logger.info("GroupThread.handleMessageForLeader()") msgFrames = self.ansPort.recvFromMember() cmd = msgFrames[0] if cmd == Group.GROUP_MTL: # Simple message to leader self.logger.info('...: simple message to leader') msg = msgFrames[1] # Forward it to component msgOut = [zmq.Frame(Group.GROUP_MTL), zmq.Frame(msg)] msgOut += [self.ansPort.get_identity()] if self.group.isTimed: self.recvTime = self.ansPort.recvTime() self.sendTime = self.ansPort.sendTime msgOut += [zmq.Frame(self.recvTime)] msgOut += [zmq.Frame(self.sendTime)] self.groupSocket.send_multipart(msgOut) elif cmd == Group.GROUP_RFV: # Request for consensus message to leader self.logger.info('...: request for consensus message to leader') msg = msgFrames[1] ok = self.startPoll(msg, self.ansPort.get_identity()) # Start the poll if (ok): self.pubPort.sendGroup(Group.GROUP_RCM, msg) # Send RCM to group else: with dc_capnp.GroupVote.from_bytes(msg) as rfv: # Poll failed (before it got started) which = rfv.which() assert(which == 'rfv') rfvId = rfv.rfv.rfvId self.announceConsensus(rfvId, 'timeout') elif cmd == Group.GROUP_RTC: # Reply to consensus to leader self.logger.info('...: consensus vote to leader') msg = msgFrames[1] self.updatePoll(msg) else: self.logger.error("GroupThread.handleMessageForLeader() - unknown message type: %s", str(cmd))
[docs] def handleMessageForMember(self): ''' Handle simple message from leader (in member) ''' self.logger.info("GroupThread.handleMessageForMember()") msgFrames = self.qryPort.recvFromLeader() cmd = msgFrames[0] if cmd == Group.GROUP_MFL: msg = msgFrames[1] msgOut = [zmq.Frame(Group.GROUP_MFL), zmq.Frame(msg)] if self.group.isTimed: self.recvTime = self.ansPort.recvTime() self.sendTime = self.ansPort.sendTime msgOut += [zmq.Frame(self.recvTime)] msgOut += [zmq.Frame(self.sendTime)] self.groupSocket.send_multipart(msgOut)
[docs] def run(self): ''' Main loop for GroupThread - polls all sources and calls handlers ''' self.logger.info("GroupThread.run() [%s]" % self.group.getGroupName()) self.setup() self.pubPort = self.group.pubPort self.pubSocket = self.pubPort.getSocket() self.subPort = self.group.subPort self.subSocket = self.subPort.getSocket() self.groupSocket = self.group.groupSocket if self.coordinated: self.qryPort = self.group.qryPort self.qrySocket = self.qryPort.getSocket() self.ansPort = self.group.ansPort self.ansSocket = self.ansPort.getSocket() else: self.qryPort = None self.qrySocket = None self.ansPort = None self.ansSocket = None self.poller = zmq.Poller() self.poller.register(self.groupSocket, zmq.POLLIN) self.poller.register(self.subSocket, zmq.POLLIN) if self.coordinated: self.poller.register(self.qrySocket, zmq.POLLIN) self.poller.register(self.ansSocket, zmq.POLLIN) if self.coordinated: # time.sleep(self.electionTimeout() / 1000.0) # Initial random sleep self.state = GroupThread.FOLLOWER now = time.time() self.lastWait = now self.lastHeartbeat = 0 self.heartbeat(now) self.timeout = self.groupHeartbeat self.leader = None self.setLeaderDeadline() else: self.timeout = None now = None toStop = False while True: events = self.poller.poll(self.timeout) # self.logger.info("GroupThread.runGroup() - polled") if self.coordinated: now = time.time() self.heartbeat(now) self.updatePeers(now) # Check if there is a change in peers if len(events) == 0 and self.coordinated: self.handleTimeout(now) else: sockets = dict(events) if self.groupSocket in sockets: # Message from component toStop = self.handleCompMessage() # del sockets[self.groupSocket] if self.subSocket in sockets: # Group message: data, heartbeat, leader, or election messages self.handleNetMessage(now) # del sockets[self.subSocket] if self.coordinated: if self.ansSocket in sockets: # Message to leader self.handleMessageForLeader() # del sockets[self.ansSocket] if self.qrySocket in sockets: # Message from leader self.handleMessageForMember() # del sockets[self.qrySocket] if toStop: break self.done = False self.group.unsetup(self) time.sleep(const.groupDiscoDelay/1000)
[docs]class Group(object): ''' Group object, represents one group instance that belongs to a component. Acts as the front-end for the GroupThread, channels messages to/from. Some of its methods are run in the GroupThread ''' GROUP_ACK = 'ack'.encode('utf-8') # Response: Acknowledge GROUP_ERR = 'err'.encode('utf-8') # Response: Error GROUP_NLD = 'nld'.encode('utf-8') # Response: No leader GROUP_MSG = 'msg'.encode('utf-8') # Message to group GROUP_UPD = 'upd'.encode('utf-8') # Update for port from disco GROUP_MTL = 'mtl'.encode('utf-8') # Message to leader GROUP_MFL = 'mfl'.encode('utf-8') # Message from leader GROUP_RFV = 'rfv'.encode('utf-8') # Request for consensus (from member to leader) GROUP_RCM = 'rcm'.encode('utf-8') # Request consensus from member (by leader) GROUP_RTC = 'rtc'.encode('utf-8') # Reply to consensus request GROUP_ANN = 'ann'.encode('utf-8') # Announce consensus result GROUP_MJD = 'mjd'.encode('utf-8') # Group member joined GROUP_MLT = 'mlt'.encode('utf-8') # Group member left GROUP_LEL = 'lel'.encode('utf-8') # Group leader elected GROUP_LEX = 'lex'.encode('utf-8') # Group leader exited def __init__(self, parent, thread, groupType, groupInstance, componentId, groupSpec, groupMinSize): self.logger = logging.getLogger(__name__) self.parent = parent self.thread = thread self.context = thread.context self.groupType = groupType self.groupInstance = groupInstance self.componentId = componentId self.groupSpec = groupSpec self.groupInstanceName = groupType + '.' + groupInstance self.messageType = self.groupSpec['message'] self.isTimed = self.groupSpec['timed'] self.kind = self.groupSpec['kind'] self.coordinated = False if self.kind == "default" else True self.groupMinSize = groupMinSize self.setupParams() # PAIR socket for the component poller self.compSocket = self.context.socket(zmq.PAIR) self.compSocket.bind('inproc://%s' % self.groupSocketName(self.groupType, self.groupInstance, self.componentId)) # Message queue for group messages self.msgQueue = collections.deque() # Group thread self.groupThread = GroupThread(self) # Create the worker thread self.done = False self.groupThread.start() while not self.done: time.sleep(0.1) # Allow groupThread to create sockets # send a message to disco to register new group assert(self.pubInfo.portKind == 'gpub') assert(self.subInfo.portKind == 'gsub') host, pubPort = self.pubInfo.portHost, self.pubInfo.portNum comp = self.parent.parent partName = comp.getName() partType = comp.getTypeName() portName = self.groupInstanceName # ??? msg = ('group', self.groupType, self.groupInstance, self.messageType, host, pubPort, partName, partType, portName) self.thread.sendControl(msg) time.sleep(const.groupDiscoDelay//1000)# GROUP_PARAMETERS = {'heartbeat' : const.groupHeartbeat, 'electionMin' : const.groupElectionMin, 'electionMax' : const.groupElectionMax, 'peerTimeout' : const.groupPeerTimeout, 'consensusTimeout' : const.groupConsensusTimeout }
[docs] def setupParams(self): modelParams = self.groupSpec["params"] for (name,value) in Group.GROUP_PARAMETERS.items(): result = modelParams.get(name,None) or value assert result > 0, "Group parameter %s must be >0 " % name setattr(self,name,result) assert self.heartbeat < self.electionMin and \ self.electionMin < self.electionMax and \ self.electionMax < self.peerTimeout and \ self.consensusTimeout < self.electionMax, \ "Group timing parameter(s) incorrectly ordered"
[docs] def leave(self): self.logger.info("Group.leave(): %s" % self.groupInstanceName) # msgFrames = [zmq.Frame(Group.GROUP_MLT)] # Send out message the component is leaving group self.compSocket.send_multipart(msgFrames) self.groupThread.join() comp = self.parent.parent # Inform component/disco about leaving the group partName = comp.getName() partType = comp.getTypeName() portName = self.groupInstanceName host, pubPort = self.pubInfo.portHost, self.pubInfo.portNum comp = self.parent.parent msg = ('ungroup', self.groupType, self.groupInstance, self.messageType, host, pubPort, partName, partType, portName) self.thread.sendControl(msg) self.compSocket.close()
[docs] def getGroupName(self): ''' Group unique name ''' return self.groupInstanceName
[docs] def getSocket(self): ''' Returns inproc socket used to communicate with GroupThread ''' return self.compSocket
[docs] def groupSocketName(self, groupType, groupName, componentId): ''' Forms the unique name for the inproc socket. ''' return "group-%s.%s.%s" % (groupType, groupName, str(componentId))
[docs] def setup(self, groupThread): ''' Set up all the sockets for the worker thread Runs in group thread ''' self.pubPort = GroupPubPort(self.parent.parent.owner, self.groupInstanceName + '_pub', self.groupSpec) self.pubInfo = self.pubPort.setupSocket(groupThread) self.subPort = GroupSubPort(self.parent.parent.owner, self.groupInstanceName + '_sub', self.groupSpec) self.subInfo = self.subPort.setupSocket(groupThread) if self.coordinated: self.qryPort = GroupQryPort(self.parent.parent.owner, self.groupInstanceName + '_qry', self.groupSpec) self.qryInfo = self.qryPort.setupSocket(groupThread) self.ansPort = GroupAnsPort(self.parent.parent.owner, self.groupInstanceName + '_ans', self.groupSpec) self.ansInfo = self.ansPort.setupSocket(groupThread) else: self.qryPort = None self.qryInfo = None self.ansPort = None self.ansInfo = None self.groupSocket = self.context.socket(zmq.PAIR) self.groupSocket.connect('inproc://%s' % self.groupSocketName(self.groupType, self.groupInstance, self.componentId)) self.done = True
[docs] def unsetup(self,groupThread): ''' Discard all the sockets used in worker thread Runs in group thread ''' self.pubPort.closeSocket() self.subPort.closeSocket() if self.coordinated: self.qryPort.closeSocket() self.ansPort.closeSocket() self.groupSocket.close()
[docs] def update(self, host, port): ''' Ask the worker thread to update its sockets. Called when the disco responds with the server (pub) host/port pair for the socket(s) to connect to by the client (sub) Runs in component thread ''' self.logger.info("Group.update(%s,%d)" % (host, port)) msgFrames = [zmq.Frame(Group.GROUP_UPD), zmq.Frame(host.encode('utf-8')), zmq.Frame(struct.pack("d", port))] self.compSocket.send_multipart(msgFrames)
[docs] def send_port(self, msgType, msg, has_identity=False): ''' Send a message to the worker thread. Used by all messages - the messages have multiple frames Runs in component thread ''' msgFrames = [zmq.Frame(msgType)] # Frame 0: message type msgFrames += [zmq.Frame(msg)] # Frame 1: payload if has_identity: assert (self.identity != None) msgFrames += [self.identity] # Frame 2: identity (opt) self.compSocket.send_multipart(msgFrames) # Receive a response from worker thread while(True): repFrames = self.compSocket.recv_multipart() res = repFrames[0] if res == Group.GROUP_ACK: # Msg ack-d return True elif res == Group.GROUP_NLD: # No leader(caller should handle it) return False elif res == Group.GROUP_ERR: # Error detected err = pickle.loads(repFrames[1]) raise PortError("send error: %s" % msg, err.__repr__(), err.errno) else: # Group MSG, MTL, MFL, etc. self.handleMessage(repFrames) # Handle it as normal message
[docs] def send_pyobj(self, msg): ''' Sends a Python object as a message to all members of the group Runs in component thread ''' msgBytes = pickle.dumps(msg) return self.send_port(Group.GROUP_MSG, msgBytes)
[docs] def send(self, msg): ''' Sends a bytes object as a message to all members of the group Runs in component thread ''' assert(type(msg) == bytes) return self.send_port(Group.GROUP_MSG, msg)
[docs] def handleMessage(self, msgFrames=None): ''' Receives message from the worker thread and handles it Runs in component thread (called from component polling loop, or from send_port) ''' try: if msgFrames == None: # No frames (called from send_port) msgFrames = self.compSocket.recv_multipart() # Receive message cmd = msgFrames[0] self.logger.info("Group.handleMessage() = %s", str(cmd)) if cmd == Group.GROUP_MSG: # Data message from a group member msg = msgFrames[1] self.msgQueue.append(msg) # Add payload to queue to be read by recv/recv_pyobj if self.isTimed: # If group is timed, store values self.recvTime = msgFrames[2] self.sendTime = msgFrames[3] self.parent.parent.handleGroupMessage(self) # Call group message handler elif cmd == Group.GROUP_MTL: # Message to leader msg = msgFrames[1] self.identity = msgFrames[2] # Save sender's identity self.msgQueue.append(msg) # Add payload to queue to be read by recv/recv_pyobj if self.isTimed: # If group is timed, store values self.recvTime = msgFrames[3] self.sendTime = msgFrames[4] self.parent.parent.handleMessageToLeader(self) # Call leader's message handler elif cmd == Group.GROUP_MFL: # Message from leader to the member msg = msgFrames[1] self.msgQueue.append(msg) # Add payload to queue to be read by recv/recv_pyobj if self.isTimed: # If group is timed, store values self.recvTime = msgFrames[2] self.sendTime = msgFrames[3] self.parent.parent.handleMessageFromLeader(self) # Call member's message handler elif cmd == Group.GROUP_RCM: msg = msgFrames[1] if self.isTimed: # If group is timed, store values self.recvTime = msgFrames[2] self.sendTime = msgFrames[3] with dc_capnp.GroupVote.from_bytes(msg) as rfv: which = rfv.which() if which == 'rfv': topic = rfv.rfv.topic rfvId = rfv.rfv.rfvId subject = rfv.rfv.subject self.msgQueue.append(topic) if subject == Poll.ACTION: # Call member's appropriate message handler when = rfv.rfv.release self.parent.parent.handleActionVoteRequest(self, rfvId, when) elif subject == Poll.VALUE: self.parent.parent.handleVoteRequest(self, rfvId) else: self.logger.error("handleMessage() - unknown poll subject %s", str(subject)) elif cmd == Group.GROUP_ANN: msg = msgFrames[1] if self.isTimed: # If group is timed, store values self.recvTime = msgFrames[2] self.sendTime = msgFrames[3] with dc_capnp.GroupVote.from_bytes(msg) as ann: which = ann.which() if which == 'ann': rfvId = ann.ann.rfvId vote = ann.ann.vote self.parent.parent.handleVoteResult(self, rfvId, vote) # Call member's message handler elif cmd == Group.GROUP_MJD: memberId = msgFrames[1] self.parent.parent.handleMemberJoined(self, memberId) elif cmd == Group.GROUP_MLT: memberId = msgFrames[1] self.parent.parent.handleMemberLeft(self, memberId) elif cmd == Group.GROUP_LEL: leaderId = msgFrames[1] self.parent.parent.handleLeaderElected(self, leaderId) elif cmd == Group.GROUP_LEX: leaderId = msgFrames[1] self.parent.parent.handleLeaderExited(self, leaderId) else: self.logger.error("handleMessage() - unknown control message %s", str(cmd)) except zmq.error.ZMQError as e: raise PortError("recv error: %s" % e.__repr__(), e.errno) from e
[docs] def recv_msg(self, is_pyobj): ''' Receive a message from the worker thread. Message are coming through a message queue. Runs in component thread, called from a component ''' if len(self.msgQueue) == 0: errno = zmq.EFAULT raise PortError("recv error (%d)" % errno, errno) else: msg = self.msgQueue.popleft() if is_pyobj: result = pickle.loads(msg) else: result = msg return result
[docs] def recv_pyobj(self): ''' Receive a Python object message from the worker thread Runs in component thread, called from a component ''' return self.recv_msg(True)
[docs] def recv(self): ''' Receive a bytes object message from the worker thread Runs in component thread, called from a component ''' return self.recv_msg(False)
[docs] def sendToLeader(self, msg): ''' Send message to group leader from a member Raise an exception if the group is not coordinated Return False (indicating operation failure) if no leader Runs in component thread. ''' if not self.groupThread.coordinated: raise PortError("sendToLeader error: group is not coordinated") if self.groupThread.leader: assert(type(msg) == bytes) return self.send_port(Group.GROUP_MTL, msg) else: return False
[docs] def sendToLeader_pyobj(self, msg): ''' Send PyObject message to group leader from a member Raise an exception if the group is not coordinated Return False (indicating operation failure) if no leader Runs in component thread. ''' if not self.groupThread.coordinated: raise PortError("sendToLeader error: group is not coordinated") if self.groupThread.leader: msgOut = pickle.dumps(msg) return self.send_port(Group.GROUP_MTL, msgOut) else: return False
[docs] def sendToMember_pyobj(self, msg, identity=None): ''' Send PyObject message to group member (with identity) from the leader If identity is not supplied, last value of identity is used. Raise an exception if the group is not coordinated Return False (indicating operation failure) if no leader Runs in component thread. ''' if not self.groupThread.coordinated: raise PortError("sendToLeader error: group is not coordinated") if self.groupThread.leader: if identity != None: self.identity = identity assert(self.identity != None) msgOut = pickle.dumps(msg) return self.send_port(Group.GROUP_MFL, msgOut, True) else: return False
[docs] def sendToMember(self, msg, identity=None): ''' Send message to group member (with identity) from the leader If identity is not supplied, last value of identity is used. Raise an exception if the group is not coordinated Return False (indicating operation failure) if no leader Runs in component thread. ''' if not self.groupThread.coordinated: raise PortError("sendToLeader error: group is not coordinated") if self.groupThread.leader: if identity != None: self.identity = identity assert(self.identity != None) assert(type(msg) == bytes) return self.send_port(Group.GROUP_MFL, msg, True) else: return False
[docs] def hasLeader(self): ''' True if the group has a leader. ''' return self.groupThread.leader != None
[docs] def getLeaderId(self): ''' Return the leader's id (or None if no leader) ''' return self.groupThread.leader
[docs] def isLeader(self): ''' Return True if the group member IS the leader. ''' return self.groupThread.isLeader()
[docs] def groupSize(self): ''' Return the size of the group (>= 1). ''' return self.groupThread.numPeers
[docs] def getGroupId(self): return self.groupThread.ownId
[docs] def requestVote(self, topic, kind=Poll.MAJORITY, timeout=None): ''' Request a vote on a topic (with timeout). Topic is a bytes. A message is sent to the leader (if any) that starts a voting process. Returns None if there is no leader, otherwise a generated id string for the request. ''' if not self.groupThread.coordinated: raise PortError("requestVote error: group is not coordinated") if self.groupThread.leader: assert(type(topic) == bytes) assert(timeout == None or type(timeout) == float) rfvId = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8)) msg = dc_capnp.GroupVote.new_message() rfv = msg.init('rfv') rfv.topic = topic rfv.rfvId = rfvId rfv.kind = kind rfv.subject = 'value' rfv.started = time.time() rfv.timeout = 0.0 if timeout == None else timeout / 1000.0 msgBytes = msg.to_bytes() res = self.send_port(Group.GROUP_RFV, msgBytes) return rfvId if res == True else None else: return None
[docs] def requestVote_pyobj(self, topic, kind=Poll.MAJORITY, timeout=None): ''' Request a vote on a topic (with timeout). Topic is a Python object. A message is sent to the leader (if any) that starts a voting process. Returns None if there is no leader, otherwise a generated id string for the request. ''' if not self.groupThread.coordinated: raise PortError("requestVote_pyobj error: group is not coordinated") if self.groupThread.leader: assert(timeout == None or type(timeout) == float) topicOut = pickle.dumps(topic) rfvId = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8)) msg = dc_capnp.GroupVote.new_message() rfv = msg.init('rfv') rfv.topic = topicOut rfv.rfvId = rfvId rfv.kind = kind rfv.subject = 'value' rfv.started = time.time() rfv.timeout = 0.0 if timeout == None else timeout / 1000.0 msgBytes = msg.to_bytes() res = self.send_port(Group.GROUP_RFV, msgBytes) return rfvId if res == True else False else: return False
[docs] def sendVote(self, rfvId, vote): ''' Send a vote (True/False) to the leader on a requested topic identified by the id of the request for vote. ''' # Send a True/False vote on the topic (identified by rfvId) to the leader if not self.groupThread.coordinated: raise PortError("sendVote() error: group is not coordinated") msg = dc_capnp.GroupVote.new_message() rtc = msg.init('rtc') rtc.rfvId = rfvId rtc.vote = 'yes' if vote == True else 'no' msgBytes = msg.to_bytes() res = self.send_port(Group.GROUP_RTC, msgBytes) return res == True
[docs] def requestActionVote(self, action, when, kind=Poll.CONSENSUS, timeout=None): # Send a request for a vote on an action to be taken in the future if not self.groupThread.coordinated: raise PortError("requestActionVote error: group is not coordinated") if self.groupThread.leader: assert(type(action) == bytes) assert(type(when) == float) assert(timeout == None or type(timeout) == float) rfvId = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8)) msg = dc_capnp.GroupVote.new_message() rfv = msg.init('rfv') rfv.topic = action rfv.rfvId = rfvId rfv.kind = kind rfv.subject = 'action' rfv.release = when rfv.started = time.time() rfv.timeout = 0.0 if timeout == None else timeout / 1000.0 msgBytes = msg.to_bytes() res = self.send_port(Group.GROUP_RFV, msgBytes) return rfvId if res == True else None else: return None
[docs] def requestActionVote_pyobj(self, action, when, kind=Poll.CONSENSUS, timeout=None): # Send a request for a vote on an action to be taken in the future if not self.groupThread.coordinated: raise PortError("requestActionVote error: group is not coordinated") if self.groupThread.leader: actionOut = pickle.dumps(action) assert(type(when) == float) assert(timeout == None or type(timeout) == float) rfvId = ''.join(random.choices(string.ascii_uppercase + string.digits, k=8)) msg = dc_capnp.GroupVote.new_message() rfv = msg.init('rfv') rfv.topic = actionOut rfv.rfvId = rfvId rfv.kind = kind rfv.subject = 'action' rfv.release = when rfv.started = time.time() rfv.timeout = 0.0 if timeout == None else timeout / 1000.0 msgBytes = msg.to_bytes() res = self.send_port(Group.GROUP_RFV, msgBytes) return rfvId if res == True else None else: return None
[docs] def sendActionVote(self, rfvId, vote): # Send a True/False vote on the action (identified by rfvId) to the leader pass
[docs]class Coordinator(object): ''' Coordinator object. Each component has a coordinator object that creates the group objects for a component. A group instance can be created only once in a component, subsequent creations return the same group object ''' def __init__(self, parent): self.logger = logging.getLogger(__name__) self.parent = parent self.context = None self.groupTypes = self.parent.owner.parent.groupTypes self.groupMembers = { } self.groups = { }
[docs] def groupName(self, groupType, groupInstance): ''' Form a name for a group instance ''' return groupType + '.' + groupInstance
[docs] def joinGroup(self, thread, groupType, groupInstance, componentId, groupMinSize): ''' Operation to create a group instance (with capacity) in a component. Returns the instance ''' self.logger.info("Coordinator.joinGroup(%s,%s)" % (groupType, groupInstance)) # In component thread if not groupInstance.isidentifier(): self.logger.error("Coordinator.joinGroup: invalid group name %s", groupInstance) return None if groupType not in self.groupTypes: self.logger.error("Coordinator.joinGroup: group %s undefined", groupType) return None else: groupSpec = self.groupTypes[groupType] key = self.groupName(groupType, groupInstance) res = None if key in self.groupMembers: self.logger.info("Coordinator.joinGroup - known group (%s)" % key) res = self.groupMembers[key] else: res = Group(self, thread, groupType, groupInstance, componentId, groupSpec, groupMinSize) self.groupMembers[key] = res self.logger.info("Coordinator.joinGroup - new group (%s)" % key) return res
[docs] def getGroup(self, groupType, groupInstance): ''' Returns a group instance based on its name ''' key = self.groupName(groupType, groupInstance) return self.groupMembers[key] if key in self.groupMembers else None
[docs] def leaveGroup(self, group): ''' Operation to 'leave' a group by a component. The group will be deactivated / its threads stopped, and deleted.. ''' groupName = group.getGroupName() self.logger.info("Coordinator.leaveGroup(%s)" % groupName) group.leave() del self.groupMembers[groupName] del group