#!/usr/bin/python3
'''
Script to generate a public/private key pair and a
self-signed certificate for securing riaps communications.
THE KEY ANND CERTIFICATE MUST NEVER BE USED IN FIELDED SYSTEMS.
Keys and the certificate must be installed in the $RIAPSHOME/keys
directory. Private key is NOT ENCRYPTED.
@author: riaps
'''
import datetime
from os.path import exists, join
import sys
import argparse
import shutil
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
import zmq.auth
from riaps.consts.defs import *
CERT_FILE = const.ctrlCertificate
KEY_FILE = const.ctrlPrivateKey
PUB_FILE = const.ctrlPublicKey
ZMQ_CERT_FILE = const.zmqCertificate
[docs]def generate_keys(cert_dir):
if exists(join(cert_dir, KEY_FILE)) or exists(join(cert_dir, PUB_FILE)) :
sys.exit("Error: %s or %s already exists - move them first" % (KEY_FILE, PUB_FILE))
private_key = rsa.generate_private_key(public_exponent=65537,
key_size=2048,
backend=default_backend())
with open(join(cert_dir, KEY_FILE), "wb") as f:
f.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
))
public_key = private_key.public_key()
with open(join(cert_dir, PUB_FILE), "wb") as f:
f.write(public_key.public_bytes(
encoding=serialization.Encoding.OpenSSH,
format=serialization.PublicFormat.OpenSSH
))
return private_key
[docs]def generate_self_signed_cert(cert_dir,key):
if exists(join(cert_dir, CERT_FILE)):
sys.exit("Error: %s already exists - move it first" % (CERT_FILE))
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Tennessee"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"Nashville"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"Vanderbilt University"),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"Institute for Software-Integrated Systems"),
x509.NameAttribute(NameOID.COMMON_NAME, u"riaps.isis.vanderbilt.edu")
])
cert = x509.CertificateBuilder()\
.subject_name(subject)\
.issuer_name(issuer)\
.public_key(key.public_key())\
.serial_number(x509.random_serial_number())\
.not_valid_before(datetime.datetime.utcnow())\
.not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=999))\
.add_extension(x509.SubjectAlternativeName([x509.DNSName(u"localhost")]),critical=False)\
.sign(key, hashes.SHA256(), default_backend())
with open(join(cert_dir, CERT_FILE), "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
[docs]def generate_zmq_cert(cert_dir):
if exists(join(cert_dir, ZMQ_CERT_FILE)):
sys.exit("Error: %s already exists - move it first" % (ZMQ_CERT_FILE))
_public,cert = zmq.auth.create_certificates(cert_dir, "riaps",None)
shutil.move(cert,join(cert_dir, const.zmqCertificate))
[docs]def main():
parser = argparse.ArgumentParser()
output_dir = "."
parser.add_argument("-o", "--output", help="Output directory. Default is the current directory.")
args = parser.parse_args()
if args.output:
output_dir = args.output
if exists(join(output_dir, KEY_FILE)):
with open(join(output_dir, KEY_FILE),"rb") as f:
data = f.read()
key = serialization.load_pem_private_key(data, None, default_backend())
else:
key = generate_keys(output_dir)
generate_self_signed_cert(output_dir,key)
generate_zmq_cert(output_dir)
if __name__ == '__main__':
main()