#!/usr/bin/env python3 # link: https://git.p2p.legal/aya/dpgpid/ # desc: generate ed25519 keys suitable for duniter or ipfs # Copyleft 2022 Yann Autissier # all crypto science belongs to Pascal Engélibert # coming from files available at https://git.p2p.legal/qo-op/Astroport.ONE/tools # gpgme stuff has been provided by Ben McGinnes # and comes from http://files.au.adversary.org/crypto/gpgme-python-howto.html # gpg key extraction is taken from work of Simon Vareille available at # https://gist.github.com/SimonVareille/fda49baf5f3e15b5c88e25560aeb2822 # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import argparse import base58 import base64 import configparser import crypto_pb2 import cryptography.hazmat.primitives.asymmetric.ed25519 as ed25519 from cryptography.hazmat.primitives import serialization import duniterpy.key import gpg import logging as log import nacl.bindings import nacl.encoding import pgpy import pynentry from SecureBytes import clearmem import os import re import struct import sys import time import warnings __version__='0.0.1' class keygen: def __init__(self): self.parser = argparse.ArgumentParser() self.parser.add_argument( "-d", "--debug", action="store_true", help="show debug informations (WARNING: THIS WILL PRINT PRIVATE KEY VALUE)", ) self.parser.add_argument( "-i", "--input", dest="input", default=None, help="read public and secret keys in pubsec format from file INPUT", ) self.parser.add_argument( "-g", "--gpg", action="store_true", help="use gpg key from user id matched by username option as input", ) self.parser.add_argument( "-o", "--output", dest="output", default=None, help="write public and secret keys to file OUTPUT", ) self.parser.add_argument( "-q", "--quiet", action="store_true", help="show only errors", ) self.parser.add_argument( "-t", "--type", dest="type", default="ipfs", help="output key type : [ duniter | ipfs ]", ) self.parser.add_argument( "-v", "--verbose", action="store_true", help="show more informations", ) self.parser.add_argument( "--version", action="store_true", help="show version and exit", ) self.parser.add_argument( 'username', nargs="?", ) self.parser.add_argument( 'password', nargs="?", ) def _check_args(self, args): log.debug("keygen._check_args(%s)" % args) if self.input is None: if self.password is None: if self.username is None: self.parser.error(f"keygen requires an input file or username args") def _cleanup(self): log.debug("keygen._cleanup()") if hasattr(keygen, 'base58_secret_key'): clearmem(self.base58_secret_key) if hasattr(keygen, 'duniter'): if hasattr(keygen.duniter, 'sk'): clearmem(self.duniterpy.sk) if hasattr(keygen, 'ed25519_secret_bytes'): clearmem(self.ed25519_secret_bytes) if hasattr(keygen, 'ed25519_secret_pem_pkcs8'): clearmem(self.ed25519_secret_pem_pkcs8) if hasattr(keygen, 'ipfs_privkey'): clearmem(self.ipfs_privkey) if hasattr(keygen, 'password'): clearmem(self.password) if hasattr(keygen, 'pgpy'): clearmem(self.pgpy._key.keymaterial.p) clearmem(self.pgpy._key.keymaterial.q) clearmem(self.pgpy._key.keymaterial.s) if hasattr(keygen, 'pgpy_key_seed'): clearmem(self.pgpy_key_seed) if hasattr(keygen, 'pgpy_key_value'): clearmem(self.pgpy_key_value) def _invalid_type(self): log.debug("keygen._invalid_type()") self.parser.error(f"type: {self.type} is not valid.") def _load_config(self): log.debug("keygen._load_config()") self.config = configparser.RawConfigParser() config_dir = os.path.join(os.environ.get('XDG_CONFIG_HOME', os.path.expanduser('~/.config')), 'dpgpid') log.debug("config_dir=%s" % config_dir) self.config.read( [config_dir + '/keygen.conf'] ) def _run(self, argv): args = self.parser.parse_args(argv) vars(self).update(vars(args)) # display version if args.version: version() sys.exit() # define log format log_format='%(asctime)s %(levelname)s: %(message)s' log_datefmt='%Y/%m/%d %H:%M:%S' if args.debug: log_level='DEBUG' elif args.quiet: log_level='ERROR' elif args.verbose: log_level='INFO' else: log_level='WARNING' log.basicConfig(format=log_format, datefmt=log_datefmt, level=log_level) log.debug("keygen.run(%s)" % argv) self._check_args(args) self._load_config() # self.gpg = gpg.Context(armor=True, offline=True, homedir=GNUPGHOME) self.gpg = gpg.Context( armor=True, offline=True) self.gpg.set_passphrase_cb(self.gpg_passphrase_cb) self.ed25519(args) method = getattr(self, f'do_{self.type}', self._invalid_type) return method() def base58_from_ed25519(self): log.debug("keygen.base58_from_ed25519()") self.base58_public_key = base58.b58encode(self.ed25519_public_bytes).decode('ascii') self.base58_secret_key = base58.b58encode(self.ed25519_secret_bytes).decode('ascii') log.debug("keygen.base58_public_key=%s" % self.base58_public_key) log.debug("keygen.base58_secret_key=%s" % self.base58_secret_key) def base58_from_pubsec(self): log.debug("keygen.base58_from_pubsec()") for line in open(self.input, "r"): if re.search("pub", line): self.base58_public_key = line.replace('\n','').split(': ')[1] elif re.search("sec", line): self.base58_secret_key = line.replace('\n','').split(': ')[1] def do_duniter(self): log.debug("keygen.do_duniter()") self.base58_from_ed25519() if self.output is None: print("pub: %s" % self.base58_public_key) print("sec: %s" % self.base58_secret_key) else: with open(self.output, "w") as fh: fh.write(f"""Type: PubSec Version: 1 pub: {self.base58_public_key} sec: {self.base58_secret_key} """ ) os.chmod(self.output, 0o600) self._cleanup() def do_ipfs(self): log.debug("keygen.do_ipfs()") self.ipfs_from_ed25519() if self.output is None: print("PeerID: %s" % self.ipfs_peerid) print("PrivKEY: %s" % self.ipfs_privkey) else: # with open(self.output, "wb") as fh: # fh.write(self.ipfs_libp2p_protobuf_key) with open(self.output, "w") as fh: fh.write(self.ed25519_secret_pem_pkcs8) os.chmod(self.output, 0o600) self._cleanup() def duniterpy_from_salt_and_password(self): log.debug("keygen.duniterpy_from_salt_and_password()") scrypt_params = duniterpy.key.scrypt_params.ScryptParams( int(self.config.get('scrypt', 'n')) if self.config.has_option('scrypt', 'n') else 4096, int(self.config.get('scrypt', 'r')) if self.config.has_option('scrypt', 'r') else 16, int(self.config.get('scrypt', 'p')) if self.config.has_option('scrypt', 'p') else 1, int(self.config.get('scrypt', 'sl')) if self.config.has_option('scrypt', 'sl') else 32, ) if not self.password: with pynentry.PynEntry() as p: p.description = f"""Please enter the passord for username "{self.username}".""" p.prompt = 'Passsord:' try: self.password = p.get_pin() except pynentry.PinEntryCancelled: log.warning('Cancelled! Goodbye.') exit(2) self.duniterpy = duniterpy.key.SigningKey.from_credentials( self.username, self.password, scrypt_params ) def ed25519(self, args): log.debug("keygen.ed25519(%s)" % args) if args.gpg is True: self.ed25519_from_gpg() elif self.input is None: self.duniterpy_from_salt_and_password() self.ed25519_from_duniterpy() else: self.base58_from_pubsec() self.ed25519_from_base58() def ed25519_from_base58(self): log.debug("keygen.ed25519_from_base58()") self.ed25519_public_bytes = base58.b58decode(self.base58_public_key) self.ed25519_secret_bytes = base58.b58decode(self.base58_secret_key) log.debug("keygen.ed25519_public_bytes=%s" % self.ed25519_public_bytes) log.debug("keygen.ed25519_secret_bytes=%s" % self.ed25519_secret_bytes) def ed25519_from_duniterpy(self): log.debug("keygen.ed25519_from_duniterpy()") self.ed25519_public_bytes = base58.b58decode(self.duniterpy.pubkey) self.ed25519_secret_bytes = self.duniterpy.sk log.debug("keygen.ed25519_public_bytes=%s" % self.ed25519_public_bytes) log.debug("keygen.ed25519_secret_bytes=%s" % self.ed25519_secret_bytes) def ed25519_from_gpg(self): log.debug("keygen.ed25519_from_gpg()") self.gpg_seckeys = list(self.gpg.keylist(pattern=self.username, secret=True)) log.debug("keygen.gpg_seckeys=%s" % self.gpg_seckeys) if not self.gpg_seckeys: log.error(f"""Unable to find any key matching username "{self.username}".""") exit(1) else: self.gpg_seckey = self.gpg_seckeys[0] log.info(f"""Found key id "{self.gpg_seckey.fpr}" matching username "{self.username}".""") log.debug("keygen.gpg_seckey.__repr__=%s" % self.gpg_seckey.__repr__) log.debug("keygen.gpg_seckey.fpr=%s" % self.gpg_seckey.fpr) self.armored_pgp_public_key = self.gpg.key_export(self.gpg_seckey.fpr) log.debug("keygen.armored_pgp_public_key=%s" % self.armored_pgp_public_key) if self.password: self.gpg.set_pinentry_mode(gpg.constants.PINENTRY_MODE_LOOPBACK) self.armored_pgp_secret_key = self.gpg.key_export_secret(self.gpg_seckey.fpr) log.debug("keygen.armored_pgp_secret_key=%s" % self.armored_pgp_secret_key) if not self.armored_pgp_secret_key: log.error(f"""Unable to export secret key id "{self.gpg_seckey.fpr}" of user "{self.username}". Please check your password!""") exit(2) with warnings.catch_warnings(): # remove CryptographyDeprecationWarning about deprecated # SymmetricKeyAlgorithm IDEA, CAST5 and Blowfish (PGPy v0.5.4) warnings.simplefilter('ignore') self.pgpy, _ = pgpy.PGPKey.from_blob(self.armored_pgp_secret_key) self.ed25519_from_pgpy() def ed25519_from_pgpy(self): log.debug("keygen.ed25519_from_pgpy()") log.debug("keygen.pgpy.fingerprint.keyid=%s" % self.pgpy.fingerprint.keyid) log.debug("keygen.pgpy.is_protected=%s" % self.pgpy.is_protected) if self.pgpy.is_protected: if not self.password: with pynentry.PynEntry() as p: p.description = f"""The exported key id "{self.pgpy.fingerprint.keyid}" of user "{self.username}" is password protected. Please enter the passphrase again to unlock it. """ p.prompt = 'Passphrase:' try: self.password = p.get_pin() except pynentry.PinEntryCancelled: log.warning('Cancelled! Goodbye.') exit(2) try: with warnings.catch_warnings(): # remove CryptographyDeprecationWarning about deprecated # SymmetricKeyAlgorithm IDEA, CAST5 and Blowfish (PGPy v0.5.4) warnings.simplefilter('ignore') with self.pgpy.unlock(self.password): assert self.pgpy.is_unlocked log.debug("keygen.pgpy.is_unlocked=%s" % self.pgpy.is_unlocked) self.pgpy_key_seed_from_pgpy() except Exception as e: log.error(f"""Unable to unlock secret key id "{self.gpg_seckey.fpr}" of user "{self.username}". Please check your password!""") exit(2) else: self.pgpy_key_seed_from_pgpy() self.ed25519_public_bytes, self.ed25519_secret_bytes = nacl.bindings.crypto_sign_seed_keypair(self.pgpy_key_seed) log.debug("keygen.ed25519_public_bytes=%s" % self.ed25519_public_bytes) log.debug("keygen.ed25519_secret_bytes=%s" % self.ed25519_secret_bytes) def gpg_passphrase_cb(self, uid_hint, passphrase_info, prev_was_bad): log.debug("keygen.gpg_passphrase_cb(%s, %s, %s)" % (uid_hint, passphrase_info, prev_was_bad)) return self.password def ipfs_from_ed25519(self): log.debug("keygen.ipfs_from_ed25519()") # PeerID ipfs_pid = base58.b58encode(b'\x00$\x08\x01\x12 ' + self.ed25519_public_bytes) self.ipfs_peerid = ipfs_pid.decode('ascii') log.debug("keygen.ipfs_peerid=%s" % self.ipfs_peerid) # PrivKey pkey = crypto_pb2.PrivateKey() pkey.Type = crypto_pb2.KeyType.Ed25519 pkey.Data = self.ed25519_secret_bytes self.ipfs_privkey = base64.b64encode(pkey.SerializeToString()).decode('ascii') log.debug("keygen.ipfs_privkey=%s" % self.ipfs_privkey) # libp2p-protobuf-cleartext format for ipfs key import self.ipfs_libp2p_protobuf_key = pkey.SerializeToString() # pem-pkcs8-cleartext format for ipfs key import self.pem_pkcs8_from_ed25519() def pem_pkcs8_from_ed25519(self): log.debug("keygen.pem_pkcs8_from_ed25519()") self.ed25519_secret_pem_pkcs8 = ed25519.Ed25519PrivateKey.from_private_bytes(self.ed25519_secret_bytes[:32]).private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()).decode('ascii') log.debug("keygen.ed25519_secret_pem_pkcs8=%s" % self.ed25519_secret_pem_pkcs8) def pgpy_key_flags(self): log.debug("keygen.pgpy_key_flags()") flags = [] strs = {pgpy.constants.KeyFlags.Certify : 'C', pgpy.constants.KeyFlags.Sign : 'S', pgpy.constants.KeyFlags.EncryptCommunications : 'E', pgpy.constants.KeyFlags.Authentication : 'A'} for sig in self.pgpy.self_signatures: if not sig.is_expired: flags += sig.key_flags self.pgpy_key_flags = "".join(strs.get(flag, '') for flag in flags) def pgpy_key_seed_from_pgpy(self): log.debug("keygen.pgpy_key_seed_from_pgpy()") self.pgpy_key_type() if self.pgpy_key_type == 'RSA': log.debug("keygen.pgpy._key.keymaterial.p=%s" % self.pgpy._key.keymaterial.p) log.debug("keygen.pgpy._key.keymaterial.q=%s" % self.pgpy._key.keymaterial.q) # custom seed: use sha256 hash of (p + q) self.pgpy_key_seed = nacl.bindings.crypto_hash_sha256(long_to_bytes(self.pgpy._key.keymaterial.p + self.pgpy._key.keymaterial.q)) p = long_to_bytes(self.pgpy._key.keymaterial.p) q = long_to_bytes(self.pgpy._key.keymaterial.q) self.pgpy_key_value = "".join([f"{c:02x}" for c in p]) + "".join([f"{c:02x}" for c in q]) self.pgpy_key_size = (len(p) + len(q)) * 8 log.debug("keygen.pgpy_key_seed=%s" % self.pgpy_key_seed) log.debug("keygen.pgpy_key_value=%s" % self.pgpy_key_value) log.debug("keygen.pgpy_key_size=%s" % self.pgpy_key_size) elif self.pgpy_key_type in ('ECDSA', 'EdDSA', 'ECDH'): log.debug("keygen.pgpy._key.keymaterial.s=%s" % self.pgpy._key.keymaterial.s) self.pgpy_key_seed = long_to_bytes(self.pgpy._key.keymaterial.s) self.pgpy_key_value = "".join([f"{c:02x}" for c in self.pgpy_key_seed]) self.pgpy_key_size = len(self.pgpy_key_seed)*8 log.debug("keygen.pgpy_key_seed=%s" % self.pgpy_key_seed) log.debug("keygen.pgpy_key_value=%s" % self.pgpy_key_value) log.debug("keygen.pgpy_key_size=%s" % self.pgpy_key_size) else: raise NotImplementedError(f"Getting seed from {self.pgpy_key_type} key is not implemented") def pgpy_key_type(self): log.debug("keygen.pgpy_key_type()") if isinstance(self.pgpy._key.keymaterial, pgpy.packet.fields.RSAPriv): self.pgpy_key_type = 'RSA' elif isinstance(self.pgpy._key.keymaterial, pgpy.packet.fields.DSAPriv): self.pgpy_key_type = 'DSA' elif isinstance(self.pgpy._key.keymaterial, pgpy.packet.fields.ElGPriv): self.pgpy_key_type = 'ElGamal' elif isinstance(self.pgpy._key.keymaterial, pgpy.packet.fields.ECDSAPriv): self.pgpy_key_type = 'ECDSA' elif isinstance(self.pgpy._key.keymaterial, pgpy.packet.fields.EdDSAPriv): self.pgpy_key_type = 'EdDSA' elif isinstance(self.pgpy._key.keymaterial, pgpy.packet.fields.ECDHPriv): self.pgpy_key_type = 'ECDH' else: self.pgpy_key_type = 'undefined' log.debug("keygen.pgpy_key_type=%s" % self.pgpy_key_type) # long_to_bytes comes from PyCrypto, which is released into Public Domain # https://github.com/dlitz/pycrypto/blob/master/lib/Crypto/Util/number.py def bytes_to_long(s): """bytes_to_long(string) : long Convert a byte string to a long integer. This is (essentially) the inverse of long_to_bytes(). """ acc = 0 unpack = struct.unpack length = len(s) if length % 4: extra = (4 - length % 4) s = b'\000' * extra + s length = length + extra for i in range(0, length, 4): acc = (acc << 32) + unpack('>I', s[i:i+4])[0] return acc def long_to_bytes(n, blocksize=0): """long_to_bytes(n:long, blocksize:int) : string Convert a long integer to a byte string. If optional blocksize is given and greater than zero, pad the front of the byte string with binary zeros so that the length is a multiple of blocksize. """ # after much testing, this algorithm was deemed to be the fastest s = b'' n = int(n) pack = struct.pack while n > 0: s = pack('>I', n & 0xffffffff) + s n = n >> 32 # strip off leading zeros for i in range(len(s)): if s[i] != b'\000'[0]: break else: # only happens when n == 0 s = b'\000' i = 0 s = s[i:] # add back some pad bytes. this could be done more efficiently w.r.t. the # de-padding being done above, but sigh... if blocksize > 0 and len(s) % blocksize: s = (blocksize - len(s) % blocksize) * b'\000' + s return s def main(argv=None): if argv is None: argv = sys.argv[1:] cli = keygen() return cli._run(argv) def version(version=__version__): print("%s v%s" % (sys.argv[0],version)) if __name__ == "__main__": sys.exit(main())