Source code for riaps.lang.lang

#!/usr/bin/env python
'''
DSL for RIAPS software models
Created on Oct 9, 2016
Uses the textX parser
@author: riaps
'''

from os.path import join
from textx.metamodel import metamodel_from_file
from textx.export import metamodel_export, model_export
from textx.exceptions import TextXSemanticError, TextXSyntaxError
import textx.model

import sys
import json
import os
import argparse

[docs]class LangError(Exception): def __init__(self, message): super(LangError, self).__init__(message)
[docs]class RiapsModel2JSON(object): ''' Class to convert the RIAPS model (constructed by the parser) into a data structure suitable for generating JSON output.Dependent on the DSL syntax and the object structure built by the parser. ''' def __init__(self,model): ''' Constructs the JSON structures from the model objects ''' self.model = model self.apps = {} for app in model.apps: appObj = {} appObj['name'] = app.name appObj['messages'] = self.getMessages(app.messages) appObj['groups'] = self.getGroups(app.groups) appObj['libraries'] = self.getLibraries(app.libraries) appObj['devices'] = self.getIOComponents(app.components) appObj['components'] = self.getComponents(app.components) appObj['actors'] = self.getActors(app.actors) self.apps[app.name] = appObj
[docs] def getMessages(self,messages): res = [] for msg in messages: msgObj = { } msgObj["name"] = msg.name res.append(msgObj) return res
[docs] def getGroups(self,groups): res = [] for group in groups: groupObj = { } groupObj["name"] = group.name groupObj["kind"] = group.kind.kind if group.kind else 'default' groupObj["message"] = group.message.name groupObj["timed"] = True if group.timed == "timed" else False params = { } if group.groupParams: for param in group.groupParams: params[param.name] = param.value groupObj["params"] = params res.append(groupObj) return res
[docs] def getLibraries(self,libraries): res = [] for lib in libraries: libObj = { } libObj["name"] = lib.name res.append(libObj) return res
[docs] def getFormals(self,formals): res = [] names = [] for formal in formals: formalObj = { } argName = formal.argName if argName in names: raise TextXSemanticError('Argument name "%s" is not unique.' % argName) else: names.append(argName) formalObj["name"] = argName if formal.argDefault != None: formalObj["default"] = formal.argDefault.default res.append(formalObj) return res
[docs] def getImpl(self,comp): l = comp.language if l == None: return "default" elif l.pyImpl: return "py" elif l.cppImpl: return "cpp"
[docs] def getIOComponents(self,components): res = {} for comp in components: if comp.ioComponent: if comp.name in res: raise TextXSemanticError('Component name "%s" is not unique.' % comp.name) compObj = { } compObj["name"] = comp.name compObj["formals"] = self.getFormals(comp.formals) compObj["language"] = self.getImpl(comp) compObj["ports"] = self.getPorts(comp.ports) res[comp.name] = compObj return res
[docs] def getComponents(self,components): res = {} for comp in components: if comp.appComponent: if comp.name in res: raise TextXSemanticError('Component name "%s" is not unique.' % comp.name) compObj = { } compObj["name"] = comp.name compObj["formals"] = self.getFormals(comp.formals) compObj["language"] = self.getImpl(comp) compObj["scheduler"] = self.getComponentScheduler(comp.scheduler) compObj["ports"] = self.getPorts(comp.ports) res[comp.name] = compObj return res
[docs] def getComponentScheduler(self,sched): return sched if sched in ('priority','rr') else 'default'
[docs] def getPorts(self,ports): pubs = {} subs = {} clts = {} srvs = {} reqs = {} reps = {} tims = {} inss = {} qrys = {} anss = {} portNames = [] portIndex = 1 for port in ports: # Assumption: 'ports' preserves the lexical order in model portObj = { } if(port.name in portNames): raise TextXSemanticError('Port name "%s" is not unique' % port.name) else: portNames.append(port.name) portClass = port.__class__.__name__ if (portClass == 'PubPort'): portObj['type'] = port.type.name portObj['timed'] = port.timed pubs[port.name] = portObj elif (portClass == 'SubPort'): portObj['type'] = port.type.name portObj['timed'] = port.timed portObj['deadline'] = port.deadline portObj['index'] = portIndex; portIndex += 1 subs[port.name] = portObj elif (portClass == 'ClntPort'): portObj['req_type'] = port.req_type.name portObj['rep_type'] = port.rep_type.name portObj['timed'] = port.timed clts[port.name] = portObj elif (portClass == 'SrvPort'): portObj['req_type'] = port.req_type.name portObj['rep_type'] = port.rep_type.name portObj['timed'] = port.timed portObj['deadline'] = port.deadline portObj['index'] = portIndex; portIndex += 1 srvs[port.name] = portObj elif (portClass == 'ReqPort'): portObj['req_type'] = port.req_type.name portObj['rep_type'] = port.rep_type.name portObj['timed'] = port.timed portObj['deadline'] = port.deadline portObj['index'] = portIndex; portIndex += 1 reqs[port.name] = portObj elif(portClass == 'RepPort'): portObj['req_type'] = port.req_type.name portObj['rep_type'] = port.rep_type.name portObj['timed'] = port.timed portObj['deadline'] = port.deadline portObj['index'] = portIndex; portIndex += 1 reps[port.name] = portObj elif(portClass == 'TimPort'): portObj['period'] = port.period portObj['deadline'] = port.deadline portObj['index'] = portIndex; portIndex += 1 tims[port.name] = portObj elif (portClass == 'InsPort'): inss[port.name] = portObj portObj['spec'] = port.spec portObj['index'] = portIndex; portIndex += 1 elif (portClass == 'QryPort'): portObj['req_type'] = port.req_type.name portObj['rep_type'] = port.rep_type.name portObj['timed'] = port.timed portObj['deadline'] = port.deadline portObj['index'] = portIndex; portIndex += 1 qrys[port.name] = portObj elif (portClass == 'AnsPort'): portObj['req_type'] = port.req_type.name portObj['rep_type'] = port.rep_type.name portObj['timed'] = port.timed portObj['deadline'] = port.deadline portObj['index'] = portIndex; portIndex += 1 anss[port.name] = portObj else: raise TextXSemanticError('Unknown type for port "%s"' % port.name) return { "subs" : subs , "pubs" : pubs, "clts" : clts , "srvs" : srvs, "reqs" : reqs, "reps" : reps, "tims" : tims, "inss" : inss, "qrys" : qrys, "anss" : anss }
[docs] def getActors(self,actors): res = {} for act in actors: if act.name in res: raise TextXSemanticError('Actor name "%s" is not unique.' % act.name) actObj = { } actObj["real-time"] = act.rt actObj["formals"] = self.getFormals(act.formals) actObj["locals"] = self.getLocals(act.locals) actObj["internals"] = self.getInternals(act.internals) actObj["usage"] = self.getUsage(act.usage) actObj["scheduler"] = self.getActorScheduler(act.scheduler) actObj["instances"] = self.getInstances(act.instances) # actObj["wires"] = self.getWires(act.wires) res[act.name] = actObj return res
[docs] def getLocals(self,locals_): res = [] for local in locals_: localObj = {} localObj["type"] = local.name res.append(localObj) return res
[docs] def getInternals(self,internals): res = [] for internal in internals: internalObj = {} internalObj["type"] = internal.name res.append(internalObj) return res
[docs] def getActuals(self,actuals): res = [] names = [] for actual in actuals: actualObj = { } argName = actual.argName if argName in names: raise TextXSemanticError('Argument "%s" is not unique.' % argName) else: names.append(argName) actualObj["name"] = argName if actual.argValue.param != '': actualObj["param"] = actual.argValue.param else: actualObj["value"] = actual.argValue.value res.append(actualObj) return res
[docs] @staticmethod def convertTime(value,unit): ''' Convert all time values to msec''' if unit == 'msec': return value elif unit == 'sec': return value * 1000 elif unit == 'min': return value * 60 * 1000
[docs] @staticmethod def convertMem(value,unit): ''' Convert all memory size values to kilobytes''' if unit == 'mb': return value * 1024 elif unit == 'kb': return value elif unit == 'gb': return value * 1024 * 1024
[docs] def convertRate(self,value,unit): ''' Convert all rate values to bytes/sec''' if unit == 'kbps': return int(value * 1024) elif unit == 'mbps': return int(value * 1024 * 1024) else: return None;
[docs] def getUsage(self,usage): cpuUsage = { } memUsage = { } spcUsage = { } netUsage = { } for use in usage: useClass = use.__class__.__name__ if (useClass == 'CPUUsage'): cpuUsage['use'] = use.usage cpuUsage['max'] = use.max unit = 'sec' if use.unit == None else use.unit cpuUsage['interval'] = self.convertTime(use.interval,unit) elif (useClass == 'MemUsage'): unit = 'MB' if use.unit == None else use.unit memUsage['use'] = self.convertMem(use.usage,unit) elif (useClass == 'SpaceUsage'): unit = 'MB' if use.unit == None else use.unit spcUsage['use'] = self.convertMem(use.usage,unit) elif (useClass == 'NetUsage'): netUsage['rate'] = self.convertRate(use.rate,use.rateUnit) netUsage['ceil'] = 0 if use.ceil == None else self.convertRate(use.ceil,use.ceilUnit) netUsage['burst'] = 0 if use.burst == None else int(use.burst * 1024) else: raise TextXSemanticError('Unknown usage for port "%s"' % (str(use))) return { "cpu" : cpuUsage , "mem" : memUsage, "spc" : spcUsage , "net" : netUsage }
[docs] def getActorScheduler(self,sched): res = { } if sched: if sched.rr: res["policy"] = 'rr' res["priority"] = 0 else: res["policy"] = 'pri' res["priority"] = sched.priority return res
[docs] def getInstances(self,instances): res = { } for inst in instances: if inst.name in res: raise TextXSemanticError('Instance name "%s" is not unique.' % inst.name) instObj = { "type" : inst.type.name, } instObj["actuals"] = self.getActuals(inst.actuals) res[inst.name] = instObj return res
# def getWires(self,wires): # res = [] # for wire in wires: # wireObj = { "lhsName" : wire.lhsInst.name, # "lhsPort" : wire.lhsPort.name, # "rhsName" : wire.rhsInst.name, # "rhsPort" : wire.rhsPort.name, # } # res.append(wireObj) # return res # Object processor for timer ports: the 'spec' part is optional. If missing, it means period=0, i.e. a one-shot timer
[docs]def timport_obj_processor(timport): if timport.spec == 0: timport.period = 0 else: spec = timport.spec unit = 'msec' if timport.periodUnit == None else timport.periodUnit timport.period = RiapsModel2JSON.convertTime(spec,unit) if timport.deadline != 0: spec = timport.deadline unit = 'msec' if timport.unit == None else timport.unit timport.deadline = RiapsModel2JSON.convertTime(spec,unit)
# Object processor for inside ports: the 'spec' part is optional. If missing it implies the default 1sec trigger
[docs]def insport_obj_processor(insport): if insport.spec == True : insport.spec = 'default' else: insport.spec = None
# Object processor for non-operation ports: the 'timed' flag is optional.
[docs]def timed_port_obj_processor(port): if port.timed == 'timed': port.timed = True else: port.timed = False
# Object processor for operation ports: the 'timed' and deadline are optional.
[docs]def op_port_obj_processor(port): if port.timed == 'timed': port.timed = True else: port.timed = False if port.deadline != 0: spec = port.deadline unit = 'msec' if port.unit == None else port.unit port.deadline = RiapsModel2JSON.convertTime(spec,unit)
# Object processor for actors: local and internal messages must be distinct
[docs]def actor_obj_processor(actor): localMessages = actor.locals localMessageNames = set([localMessage.name for localMessage in localMessages]) internalMessages = actor.internals internalMessageNames = set([internalMessage.name for internalMessage in internalMessages]) sharedMessageNames = localMessageNames.intersection(internalMessageNames) if len(sharedMessageNames) != 0: raise TextXSemanticError('%s: Actor local and internal messages must be distinct - shared: %r' % (actor.name,sharedMessageNames)) else: actor._messageNames = localMessageNames.union(internalMessageNames)
# Object processor for io component instances: messages must be local or internal
[docs]def instance_obj_processor(instance): component = instance.type if component.ioComponent: actor = instance.parent if not hasattr(actor,'_messageNames'): actor_obj_processor(actor) messageNames = actor._messageNames portMessageNames = [] for port in component.ports: if hasattr(port, 'type'): portMessageNames.append(port.type.name) elif hasattr(port,'req_type'): portMessageNames.append(port.req_type.name) elif hasattr(port,'rep_type'): portMessageNames.append(port.rep_type.name) else: pass for portMessageName in portMessageNames: if portMessageName not in messageNames: raise TextXSemanticError('Non-(local/internal) message type %s for IO component %s:%s' % (portMessageName,instance.name,component.name)) else: pass
# # Object processor for wires: checks if the names used are correct. # # Wires are to connect ports of local instances. # def wire_obj_processor(wire): # # print '(' + wire.lhsName + '.' + wire.lhsPortName + '=' + wire.rhsName + '.' + wire.rhsPortName + ')' # instances = wire.parent.instances # lhsInst = [inst for inst in instances if inst.name == wire.lhsName] # if not lhsInst: # raise TextXSemanticError('Wire LHS instance "%s" not found.' % # wire.lhsName) # else: # wire.lhsInst = lhsInst[0] # lhsPort = [port for port in wire.lhsInst.type.ports if port.name == wire.lhsPortName] # if not lhsPort: # raise TextXSemanticError('Wire LHS port "%s" in instance "%s" not found.' % # (wire.lhsPortName, wire.lhsName)) # else: # wire.lhsPort=lhsPort[0] # rhsInst = [inst for inst in instances if inst.name == wire.rhsName] # if not rhsInst: # raise TextXSemanticError('Wire RHS instance "%s" not found.' % # wire.rhsName) # else: # wire.rhsInst=rhsInst[0] # rhsPort = [port for port in wire.rhsInst.type.ports if port.name == wire.rhsPortName] # if not rhsPort: # raise TextXSemanticError('Wire RHS port "%s" in instance "%s" not found.' % # (wire.rhsPortName, wire.rhsName)) # else: # wire.rhsPort=rhsPort[0] # # print lhsInst, lhsPort, rhsInst, rhsPort
[docs]def compileModel(modelFileName,verbose=False,debug=False,generate=True): riaps_folder = os.getenv('RIAPSHOME', './') # RIAPSHOME points to the folder containing the grammar this_folder = os.getcwd() # Get meta-model from language grammar riaps_meta = metamodel_from_file(join(riaps_folder, 'lang/riaps.tx'), debug=debug) # Register object processors for wires and timer ports obj_processors = { # 'Wire': wire_obj_processor, 'TimPort': timport_obj_processor, # 'InsPort': op_port_obj_processor, # Inside port cannot be timed / have deadline 'Instance': instance_obj_processor, 'Actor' : actor_obj_processor, 'PubPort' : timed_port_obj_processor, 'SubPort' : op_port_obj_processor, 'ClntPort': timed_port_obj_processor, 'SrvPort' : op_port_obj_processor, 'ReqPort' : op_port_obj_processor, 'RepPort' : op_port_obj_processor, 'QryPort' : op_port_obj_processor, 'AnsPort' : op_port_obj_processor, # We should also check for parameters: # (1) formal/actual lists should match, # (2) inherited parameters should appear in their parent } riaps_meta.register_obj_processors(obj_processors) # Optionally export meta-model to dot (for debugging only) # metamodel_export(riaps_meta, join(this_folder, 'riaps_meta.dot')) try: # Instantiate the model object structure from the model file example_riaps_model = riaps_meta.model_from_file(join(this_folder,modelFileName)) except IOError as e: errMsg = "I/O error({0}): {1}".format(e.errno, e.strerror) if verbose: print (errMsg) raise LangError(errMsg) except TextXSyntaxError as e: errMsg = "Syntax error: %s" % e.args if verbose: print (errMsg) raise LangError(errMsg) except TextXSemanticError as e: errMsg = "Semantic error: %s" % e.args if verbose: print (errMsg) raise LangError(errMsg) except Exception as e: errMsg = "Unexpected error %s:%s" % (sys.exc_info()[0],e.args()) if verbose: print (errMsg) raise LangError(errMsg) # Optionally export model to dot # model_export(example_riaps_model, join(this_folder, 'sample.dot')) # Convert model object structure into JSON data riaps_model = RiapsModel2JSON(example_riaps_model) riaps_model_json = { } # Optionally print generated JSON on console if verbose: riaps_model_json = json.dumps(riaps_model.apps,indent=2, separators=(',', ':')) print(riaps_model_json) # Generated JSON files for each app if generate: for appName in riaps_model.apps: fp = open(appName + '.json','w') appObj = riaps_model.apps[appName] json.dump(appObj,fp,indent=4,sort_keys=True,separators=(',', ':')) fp.close() return riaps_model.apps
[docs]def main(debug=False): parser = argparse.ArgumentParser() parser.add_argument("model", help="model file") parser.add_argument("-v","--verbose", help="print JSON on console", action="store_true") args = parser.parse_args() return compileModel(args.model,args.verbose)
if __name__ == '__main__': m = main() # print (m)