Source code for riaps.utils.gencert

#!/usr/bin/python3
'''

Script to generate a public/private key pair and a
self-signed certificate for securing riaps communications.
THE KEY ANND CERTIFICATE MUST NEVER BE USED IN FIELDED SYSTEMS.
Keys and the certificate must be installed in the $RIAPSHOME/keys
directory. Private key is NOT ENCRYPTED.

@author: riaps
'''
import datetime
from os.path import exists, join
import sys
import argparse
import shutil

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes

import zmq.auth

from riaps.consts.defs import *

CERT_FILE = const.ctrlCertificate
KEY_FILE = const.ctrlPrivateKey
PUB_FILE = const.ctrlPublicKey
ZMQ_CERT_FILE = const.zmqCertificate

[docs]def generate_keys(cert_dir): if exists(join(cert_dir, KEY_FILE)) or exists(join(cert_dir, PUB_FILE)) : sys.exit("Error: %s or %s already exists - move them first" % (KEY_FILE, PUB_FILE)) private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend()) with open(join(cert_dir, KEY_FILE), "wb") as f: f.write(private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), )) public_key = private_key.public_key() with open(join(cert_dir, PUB_FILE), "wb") as f: f.write(public_key.public_bytes( encoding=serialization.Encoding.OpenSSH, format=serialization.PublicFormat.OpenSSH )) return private_key
[docs]def generate_self_signed_cert(cert_dir,key): if exists(join(cert_dir, CERT_FILE)): sys.exit("Error: %s already exists - move it first" % (CERT_FILE)) subject = issuer = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Tennessee"), x509.NameAttribute(NameOID.LOCALITY_NAME, u"Nashville"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Vanderbilt University"), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"Institute for Software-Integrated Systems"), x509.NameAttribute(NameOID.COMMON_NAME, u"riaps.isis.vanderbilt.edu") ]) cert = x509.CertificateBuilder()\ .subject_name(subject)\ .issuer_name(issuer)\ .public_key(key.public_key())\ .serial_number(x509.random_serial_number())\ .not_valid_before(datetime.datetime.utcnow())\ .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=999))\ .add_extension(x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),critical=False)\ .sign(key, hashes.SHA256(), default_backend()) with open(join(cert_dir, CERT_FILE), "wb") as f: f.write(cert.public_bytes(serialization.Encoding.PEM))
[docs]def generate_zmq_cert(cert_dir): if exists(join(cert_dir, ZMQ_CERT_FILE)): sys.exit("Error: %s already exists - move it first" % (ZMQ_CERT_FILE)) _public,cert = zmq.auth.create_certificates(cert_dir, "riaps",None) shutil.move(cert,join(cert_dir, const.zmqCertificate))
[docs]def main(): parser = argparse.ArgumentParser() output_dir = "." parser.add_argument("-o", "--output", help="Output directory. Default is the current directory.") args = parser.parse_args() if args.output: output_dir = args.output if exists(join(output_dir, KEY_FILE)): with open(join(output_dir, KEY_FILE),"rb") as f: data = f.read() key = serialization.load_pem_private_key(data, None, default_backend()) else: key = generate_keys(output_dir) generate_self_signed_cert(output_dir,key) generate_zmq_cert(output_dir)
if __name__ == '__main__': main()