Source code for riaps.discd.dbase_redis

'''
Discovery server database interface
Created on Oct 19, 2016

@author: riaps
'''

import redis
from riaps.consts.defs import *
from riaps.run.exc import *
from riaps.utils.config import Config
from .dbase import DiscoDbase
import re
import os
import logging

[docs]class RedisDbase(DiscoDbase): ''' Discovery service database implemented using redis ''' def __init__(self,context_,dbaseLoc): ''' Construct the database object, set up singleton. ''' super().__init__(context_, dbaseLoc) self.logger = logging.getLogger(__name__) # Singleton DiscoDbase object # global theDiscoBase # theDiscoBase = self self.riaps_Folder = os.getenv('RIAPSHOME', './') self.keyFile = os.path.join(self.riaps_Folder,"keys/" + str(const.ctrlPrivateKey)) self.certFile = os.path.join(self.riaps_Folder,"keys/" + str(const.ctrlCertificate)) self.context = context_ self.r = None # The redis connection self.rHostPort = dbaseLoc # Redis access string: 'host:port' (if any)
[docs] def start(self): ''' Start the database: connect to the database process ''' # Optional redis access string points to the database # Redis-specific syntax: database_host ":" database_port if self.rHostPort != None: pair = re.split(":",self.rHostPort) host = str(pair[0]) port = int(pair[1]) else: host = const.discoRedisHost port = const.discoRedisPort try: self.logger.info("connecting to redis") if Config.SECURITY: self.r = redis.StrictRedis(host,port, db=0, ssl=True, ssl_keyfile=self.keyFile, ssl_certfile=self.certFile, ssl_cert_reqs="required", ssl_ca_certs=self.certFile ) else: self.r = redis.StrictRedis(host,port, db=0) # Connect self.notesPubSub = self.r.pubsub() # Set up pubsub channel to receive notifications except redis.exceptions.ConnectionError: raise DatabaseError("db connection lost") except OSError: raise DatabaseError("OS error") self.subKeys = set() # Set of all keys subscribed to by this disco instance
[docs] def addSub(self,newKey): ''' Update the list of subscribed keys with the new key ''' self.logger.info("addSub: %s" % newKey) if newKey in self.subKeys: return self.subKeys.add(newKey) fullKey = '__keyspace@0__:' + newKey # Redis-specific: reference to a key in the 'keyspace' self.notesPubSub.subscribe(fullKey)
[docs] def delSub(self,key): ''' Delete subscription to key ''' self.logger.info("delSub: %s" % key) if key not in self.subKeys: return self.subKeys.remove(key) fullKey = '__keyspace@0__:' + key # Redis-specific: reference to a key in the 'keyspace' self.notesPubSub.unsubscribe(fullKey)
#
[docs] def fetchUpdates(self): ''' Check and fetch the updated values of the subscribed keys if any ''' if len(self.subKeys) == 0: return [] keys = [] try: while True: msg = self.notesPubSub.get_message(True) # Use redis pubsub feature to check for a notification if msg is None: break else: channel = msg['channel'].decode('utf-8') data = msg['data'].decode('utf-8') keySplit = re.split('__keyspace@0__:', channel) keys.append(keySplit[1]) res = [] for key in keys: values = self.r.smembers(key) clientsKey = key + "_clients" clientsToNotify = self.r.smembers(clientsKey) clientsToNotify = list(map(lambda s:s.decode('utf-8'),clientsToNotify)) for value in values: valueString = value.decode('utf-8') res.append((key, valueString, clientsToNotify)) self.logger.info("fetchUpdates:%r" % res) return res except redis.exceptions.ConnectionError: raise DatabaseError("db connection lost") except OSError: raise DatabaseError("OS error")
[docs] def insert(self,key:str,value:str) -> [str]: ''' Insert value under key and return list of clients of value (if any). A key may have multiple values associated with it, hence the new value is added to the set of values that belong to the key ''' self.logger.info("insert %s -> %s" % (repr(key),repr(value))) try: clientsToNotify = [] if self.r.exists(key) and (value.encode('utf-8') in self.r.smembers(key)): return [] self.r.sadd(key,value) clientsKey = key + "_clients" clientsToNotify = self.r.smembers(clientsKey) clientsToNotify = list(map(lambda s:s.decode('utf-8'),clientsToNotify)) return clientsToNotify except redis.exceptions.ConnectionError: raise DatabaseError("db connection lost") except OSError: raise DatabaseError("OS error")
[docs] def fetch(self,key:str,client:str) -> [str]: ''' Fetch value(s) under key. Add client to list of clients interested in the value ''' self.logger.info("fetch %s for %s" % (repr(key),repr(client))) self.addSub(key) try: if self.r.exists(key): values = self.r.smembers(key) values = list(map(lambda s:s.decode('utf-8'),values)) else: values = [] clientsKey = key + "_clients" self.r.sadd(clientsKey,client) return values except redis.exceptions.ConnectionError: raise DatabaseError("db connection lost") except OSError: raise DatabaseError("OS error")
[docs] def remove(self,key : str,value : str) -> [str]: ''' Remove value from values under key. ''' self.logger.info("remove %s from %s" % (repr(value),repr(key))) try: self.r.srem(key,value) values = self.r.smembers(key) values = list(map(lambda s:s.decode('utf-8'),values)) if len(values) == 0: self.delSub(key) return values except redis.exceptions.ConnectionError: raise DatabaseError("db connection lost") except OSError: raise DatabaseError("OS error")
[docs] def detach(self, key:str, target:str): ''' Detach update client from keys ''' self.logger.info("detach %s from %r" % (target,key)) try: clientsKey = key + "_clients" try: self.r.srem(clientsKey,target) except: pass except redis.exceptions.ConnectionError: raise DatabaseError("db connection lost") except OSError: raise DatabaseError("OS error")