wip: refacto

This commit is contained in:
Yann Autissier 2022-10-08 12:51:28 +02:00
parent fb08e6ed65
commit 65fdb648ca
3 changed files with 220 additions and 213 deletions

View File

@ -19,6 +19,7 @@
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
from jwcrypto.jwk import JWK from jwcrypto.jwk import JWK
from SecureBytes import clearmem
import base36 import base36
import base58 import base58
import base64 import base64
@ -34,7 +35,8 @@ def from_args(args, config):
log.debug("key.from_args(%s, %s)" % (args, config)) log.debug("key.from_args(%s, %s)" % (args, config))
from key import ed25519 from key import ed25519
if args.gpg: if args.gpg:
return from_gpg() _gpg = gpg.Context(armor=True, offline=True)
return from_gpg(_gpg, args.username[0], args.password)
else: else:
scrypt_params = duniterpy.key.scrypt_params.ScryptParams( scrypt_params = duniterpy.key.scrypt_params.ScryptParams(
int(config.get('scrypt', 'n')) if config and config.has_option('scrypt', 'n') else 4096, int(config.get('scrypt', 'n')) if config and config.has_option('scrypt', 'n') else 4096,
@ -73,10 +75,12 @@ def from_credentials(username, password=None, scrypt_params=None):
def from_duniterpy(duniterpy): def from_duniterpy(duniterpy):
log.debug("key.from_duniterpy(%s)" % duniterpy) log.debug("key.from_duniterpy(%s)" % duniterpy)
return ed25519.from_duniterpy(duniterpy) key = ed25519.from_duniterpy(duniterpy)
key.duniterpy = duniterpy
return key
def from_file(input_file, password=None, scrypt_params=None): def from_file(input_file, password=None, scrypt_params=None):
log.debug("key.from_file()") log.debug("key.from_file(%s, %s, %s)" % (input_file, password, scrypt_params))
try: try:
with open(input_file, 'r') as file: with open(input_file, 'r') as file:
lines = file.readlines() lines = file.readlines()
@ -134,7 +138,7 @@ def from_file(input_file, password=None, scrypt_params=None):
else: else:
raise NotImplementedError('unknown input file format.') raise NotImplementedError('unknown input file format.')
else: else:
raise NotImplementedError('empty file.') raise IOError('empty file.')
except UnicodeDecodeError as e: except UnicodeDecodeError as e:
try: try:
with open(input_file, 'rb') as file: with open(input_file, 'rb') as file:
@ -163,26 +167,57 @@ def from_file(input_file, password=None, scrypt_params=None):
else: else:
raise NotImplementedError('unknown input file format.') raise NotImplementedError('unknown input file format.')
else: else:
raise NotImplementedError('empty file.') raise IOError('empty file.')
except Exception as e: except Exception as e:
log.error(f'Unable to get key from file {input_file}: {e}') log.error(f'Unable to get key from input file {input_file}: {e}')
exit(2) exit(2)
except Exception as e: except Exception as e:
log.error(f'Unable to get key from file {input_file}: {e}') log.error(f'Unable to get key from input file {input_file}: {e}')
exit(2) exit(2)
def from_gpg(): def from_gpg(_gpg, username, password=None):
log.debug("key.from_gpg()") log.debug("key.from_gpg(%s, %s, %s)" % (_gpg, username, password))
try: try:
self.pgpy() secret_keys = list(_gpg.keylist(pattern=username, secret=True))
return self.from_pgpy() log.debug("key.secret_keys=%s" % secret_keys)
if not secret_keys:
log.warning(f"""Unable to find any key matching "{username}".""")
exit(1)
else:
_gpg.secret_key = secret_keys[0]
log.info(f"""Found key id "{_gpg.secret_key.fpr}" matching "{username}".""")
log.debug("key._gpg.secret_key.expired=%s" % _gpg.secret_key.expired)
log.debug("key._gpg.secret_key.fpr=%s" % _gpg.secret_key.fpr)
log.debug("key._gpg.secret_key.revoked=%s" % _gpg.secret_key.revoked)
log.debug("key._gpg.secret_key.uids=%s" % _gpg.secret_key.uids)
log.debug("key._gpg.secret_key.owner_trust=%s" % _gpg.secret_key.owner_trust)
log.debug("key._gpg.secret_key.last_update=%s" % _gpg.secret_key.last_update)
if password:
gpg_passphrase_cb = gpg_passphrase(password).cb
_gpg.set_passphrase_cb(gpg_passphrase_cb)
_gpg.set_pinentry_mode(gpg.constants.PINENTRY_MODE_LOOPBACK)
_gpg.public_armor = _gpg.key_export(_gpg.secret_key.fpr)
_gpg.secret_armor = _gpg.key_export_secret(_gpg.secret_key.fpr)
if not _gpg.secret_armor:
log.error(f"""Unable to export gpg secret key id "{_gpg.secret_key.fpr}" for username "{username}". Please check your password!""")
exit(2)
with warnings.catch_warnings():
# remove CryptographyDeprecationWarning about deprecated
# SymmetricKeyAlgorithm IDEA, CAST5 and Blowfish (PGPy v0.5.4)
warnings.simplefilter('ignore')
_pgpy, _ = pgpy.PGPKey.from_blob(_gpg.secret_armor)
key = from_pgpy(_pgpy, password)
key.gpg = _gpg
return key
except Exception as e: except Exception as e:
log.error(f'Unable to get key from gpg: {e}') log.error(f'Unable to get key from gpg: {e}')
exit(2) exit(2)
def from_jwk(jwk): def from_jwk(jwk):
log.debug("key.from_jwk(%s)" % jwk) log.debug("key.from_jwk(%s)" % jwk)
return ed25519.from_jwk(jwk) key = ed25519.from_jwk(jwk)
key.jwk = jwk
return key
def from_libp2p(libp2p): def from_libp2p(libp2p):
log.debug("key.from_libp2p(%s)" % libp2p) log.debug("key.from_libp2p(%s)" % libp2p)
@ -203,15 +238,16 @@ def from_pem(pem):
log.debug("key.from_pem(%s)" % pem) log.debug("key.from_pem(%s)" % pem)
return ed25519.from_pem(pem) return ed25519.from_pem(pem)
def from_pgpy(self, pgpy, password): def from_pgpy(_pgpy, password=None):
log.debug("key.from_pgpy(%s)" % pgpy) log.debug("key.from_pgpy(%s, %s)" % (_pgpy, password))
try: try:
log.debug("key.pgpy.fingerprint.keyid=%s" % pgpy.fingerprint.keyid) log.debug("key._pgpy.fingerprint.keyid=%s" % _pgpy.fingerprint.keyid)
log.debug("key.pgpy.is_protected=%s" % pgpy.is_protected) log.debug("key._pgpy.is_protected=%s" % _pgpy.is_protected)
if pgpy.is_protected: _pgpy.key_type = pgpy_key_type(_pgpy)
if _pgpy.is_protected:
if not password: if not password:
with pynentry.PynEntry() as p: with pynentry.PynEntry() as p:
p.description = f"""The exported pgp key id "{pgpy.fingerprint.keyid}" is password protected. p.description = f"""The exported pgp key id "{_pgpy.fingerprint.keyid}" is password protected.
Please enter the passphrase again to unlock it. Please enter the passphrase again to unlock it.
""" """
p.prompt = 'Passphrase:' p.prompt = 'Passphrase:'
@ -225,39 +261,122 @@ def from_pgpy(self, pgpy, password):
# remove CryptographyDeprecationWarning about deprecated # remove CryptographyDeprecationWarning about deprecated
# SymmetricKeyAlgorithm IDEA, CAST5 and Blowfish (PGPy v0.5.4) # SymmetricKeyAlgorithm IDEA, CAST5 and Blowfish (PGPy v0.5.4)
warnings.simplefilter('ignore') warnings.simplefilter('ignore')
with pgpy.unlock(password): with _pgpy.unlock(password):
assert pgpy.is_unlocked assert _pgpy.is_unlocked
log.debug("ed25519.pgpy.is_unlocked=%s" % pgpy.is_unlocked) log.debug("key._pgpy.is_unlocked=%s" % _pgpy.is_unlocked)
seed = self.ed25519_seed_bytes_from_pgpy() key = ed25519.from_pgpy(_pgpy)
except Exception as e: except Exception as e:
log.error(f"""Unable to unlock pgp secret key id "{self.pgpy.fingerprint.keyid}" of user "{self.username}": {e}""") log.error(f"""Unable to unlock pgp secret key id "{pgpy.fingerprint.keyid}": {e}""")
exit(2) exit(2)
else: else:
seed = self.ed25519_seed_bytes_from_pgpy() key = ed25519.from_pgpy(_pgpy)
self.ed25519_from_seed_bytes() key.pgpy = _pgpy
return key
except Exception as e: except Exception as e:
log.error(f'Unable to get ed25519 seed bytes from pgpy: {e}') log.error(f'Unable to get key from pgpy: {e}')
exit(2) exit(2)
def pgpy_key_type(_pgpy):
log.debug("key.pgpy_key_type(%s)" % _pgpy)
if isinstance(_pgpy._key.keymaterial, pgpy.packet.fields.RSAPriv):
key_type = 'RSA'
elif isinstance(_pgpy._key.keymaterial, pgpy.packet.fields.DSAPriv):
key_type = 'DSA'
elif isinstance(_pgpy._key.keymaterial, pgpy.packet.fields.ElGPriv):
key_type = 'ElGamal'
elif isinstance(_pgpy._key.keymaterial, pgpy.packet.fields.ECDSAPriv):
key_type = 'ECDSA'
elif isinstance(_pgpy._key.keymaterial, pgpy.packet.fields.EdDSAPriv):
key_type = 'EdDSA'
elif isinstance(_pgpy._key.keymaterial, pgpy.packet.fields.ECDHPriv):
key_type = 'ECDH'
else:
key_type = 'undefined'
log.debug("key.pgpy_key_type().key_type=%s" % key_type)
return key_type
class gpg_passphrase():
def __init__(self, password):
log.debug("gpg_passphrase().__init__(%s)" % password)
self.password = password
def cb(self, uid_hint, passphrase_info, prev_was_bad):
log.debug("gpg_passphrase().cb(%s, %s, %s)" % (uid_hint, passphrase_info, prev_was_bad))
return self.password
class key(): class key():
def __init__(self): def __init__(self):
self.algorithm = 'undef' log.debug("key().__init__()")
self.gpg = gpg.Context(armor=True, offline=True) self.algorithm = 'undefined'
self.gpg.set_passphrase_cb(self.gpg_passphrase_cb) self.cryptography = None
def __del__(self): def __del__(self):
log.debug("key().__del__()")
self._cleanup() self._cleanup()
def _cleanup(self): def _cleanup(self):
log.debug("key._cleanup()") log.debug("key()._cleanup()")
raise NotImplementedError(f"_cleanup() is not implemented for algorithm {self.algorithm}") if hasattr(self, 'duniterpy'):
if hasattr(self.duniterpy, 'seed') and self.duniterpy.seed:
def gpg_passphrase_cb(self, uid_hint, passphrase_info, prev_was_bad): clearmem(self.duniterpy.seed)
log.debug("key.gpg_passphrase_cb(%s, %s, %s)" % (uid_hint, passphrase_info, prev_was_bad)) log.debug("cleared: key().duniterpy.seed")
return self.password if hasattr(self.duniterpy, 'sk') and self.duniterpy.sk:
clearmem(self.duniterpy.sk)
log.debug("cleared: key().duniterpy.sk")
if hasattr(self, 'gpg'):
if hasattr(self.gpg, 'secret_armor') and self.gpg.secret_armor:
clearmem(self.gpg.secret_armor)
log.debug("cleared: key().gpg.secret_armor")
if hasattr(self, 'jwk'):
if hasattr(self, 'secret_jwk') and self.secret_jwk:
clearmem(self.secret_jwk)
log.debug("cleared: key().secret_jwk")
if hasattr(self.jwk, 'd') and self.jwk.d:
clearmem(self.jwk.d)
log.debug("cleared: key().jwk.d")
if hasattr(self, 'pgpy'):
if hasattr(self.pgpy._key.keymaterial, 'p') and self.pgpy._key.keymaterial.p and not isinstance(self.pgpy._key.keymaterial.p, pgpy.packet.fields.ECPoint):
clearmem(self.pgpy._key.keymaterial.p)
log.debug("cleared: key().pgpy._key.material.p")
if hasattr(self.pgpy._key.keymaterial, 'q') and self.pgpy._key.keymaterial.q:
clearmem(self.pgpy._key.keymaterial.q)
log.debug("cleared: key().pgpy._key.material.q")
if hasattr(self.pgpy._key.keymaterial, 's') and self.pgpy._key.keymaterial.s:
clearmem(self.pgpy._key.keymaterial.s)
log.debug("cleared: key().pgpy._key.material.s")
if hasattr(self, 'secret_b36mf') and self.secret_b36mf:
clearmem(self.secret_b36mf)
log.debug("cleared: key().secret_b36mf")
if hasattr(self, 'secret_b58mf') and self.secret_b58mf:
clearmem(self.secret_b58mf)
log.debug("cleared: key().secret_b58mf")
if hasattr(self, 'secret_b58mh') and self.secret_b58mh:
clearmem(self.secret_b58mh)
log.debug("cleared: key().secret_b58mh")
if hasattr(self, 'secret_b64mh') and self.secret_b64mh:
clearmem(self.secret_b64mh)
log.debug("cleared: key().secret_b64mh")
if hasattr(self, 'secret_base58') and self.secret_base58:
clearmem(self.secret_base58)
log.debug("cleared: key().secret_base58")
if hasattr(self, 'secret_base64') and self.secret_base64:
clearmem(self.secret_base64)
log.debug("cleared: key().secret_base64")
if hasattr(self, 'secret_cidv1') and self.secret_cidv1:
clearmem(self.secret_cidv1)
log.debug("cleared: key().secret_cidv1")
if hasattr(self, 'secret_libp2p') and self.secret_libp2p:
clearmem(self.secret_libp2p)
log.debug("cleared: key().secret_libp2p")
if hasattr(self, 'secret_pem_pkcs8') and self.secret_pem_pkcs8:
clearmem(self.secret_pem_pkcs8)
log.debug("cleared: key().secret_pem_pkcs8")
if hasattr(self, 'secret_proto2') and self.secret_proto2:
clearmem(self.secret_proto2)
log.debug("cleared: key().secret_proto2")
def to_b36mf(self): def to_b36mf(self):
log.debug("key.to_b36mf()") log.debug("key().to_b36mf()")
if not hasattr(self, 'public_cidv1') or not hasattr(self, 'secret_cidv1'): if not hasattr(self, 'public_cidv1') or not hasattr(self, 'secret_cidv1'):
self.to_cidv1() self.to_cidv1()
try: try:
@ -266,11 +385,11 @@ class key():
except Exception as e: except Exception as e:
log.error(f'Unable to get b36mf from cidv1: {e}') log.error(f'Unable to get b36mf from cidv1: {e}')
exit(2) exit(2)
log.debug("key.public_b36mf=%s" % self.public_b36mf) log.debug("key().public_b36mf=%s" % self.public_b36mf)
log.debug("key.secret_b36mf=%s" % self.secret_b36mf) log.debug("key().secret_b36mf=%s" % self.secret_b36mf)
def to_b58mf(self): def to_b58mf(self):
log.debug("key.to_b58mf()") log.debug("key().to_b58mf()")
if not hasattr(self, 'public_cidv1') or not hasattr(self, 'secret_cidv1'): if not hasattr(self, 'public_cidv1') or not hasattr(self, 'secret_cidv1'):
self.to_cidv1() self.to_cidv1()
try: try:
@ -279,11 +398,11 @@ class key():
except Exception as e: except Exception as e:
log.error(f'Unable to get b58mf from cidv1: {e}') log.error(f'Unable to get b58mf from cidv1: {e}')
exit(2) exit(2)
log.debug("key.public_b58mf=%s" % self.public_b58mf) log.debug("key().public_b58mf=%s" % self.public_b58mf)
log.debug("key.secret_b58mf=%s" % self.secret_b58mf) log.debug("key().secret_b58mf=%s" % self.secret_b58mf)
def to_b58mh(self): def to_b58mh(self):
log.debug("key.to_b58mh()") log.debug("key().to_b58mh()")
if not hasattr(self, 'public_libp2p') or not hasattr(self, 'secret_libp2p'): if not hasattr(self, 'public_libp2p') or not hasattr(self, 'secret_libp2p'):
self.to_libp2p() self.to_libp2p()
try: try:
@ -292,11 +411,11 @@ class key():
except Exception as e: except Exception as e:
log.error(f'Unable to get b58mh from libp2p: {e}') log.error(f'Unable to get b58mh from libp2p: {e}')
exit(2) exit(2)
log.debug("key.public_b58mh=%s" % self.public_b58mh) log.debug("key().public_b58mh=%s" % self.public_b58mh)
log.debug("key.secret_b58mh=%s" % self.secret_b58mh) log.debug("key().secret_b58mh=%s" % self.secret_b58mh)
def to_b64mh(self): def to_b64mh(self):
log.debug("key.to_b64mh()") log.debug("key().to_b64mh()")
if not hasattr(self, 'public_libp2p') or not hasattr(self, 'secret_libp2p'): if not hasattr(self, 'public_libp2p') or not hasattr(self, 'secret_libp2p'):
self.to_libp2p() self.to_libp2p()
try: try:
@ -305,33 +424,33 @@ class key():
except Exception as e: except Exception as e:
log.error(f'Unable to get b64mh from libp2p: {e}') log.error(f'Unable to get b64mh from libp2p: {e}')
exit(2) exit(2)
log.debug("key.public_b64mh=%s" % self.public_b64mh) log.debug("key().public_b64mh=%s" % self.public_b64mh)
log.debug("key.secret_b64mh=%s" % self.secret_b64mh) log.debug("key().secret_b64mh=%s" % self.secret_b64mh)
def to_base58(self): def to_base58(self):
log.debug("key.to_base58()") log.debug("key().to_base58()")
try: try:
self.public_base58 = base58.b58encode(self.public_bytes).decode('ascii') self.public_base58 = base58.b58encode(self.public_bytes).decode('ascii')
self.secret_base58 = base58.b58encode(self.secret_bytes).decode('ascii') self.secret_base58 = base58.b58encode(self.secret_bytes).decode('ascii')
except Exception as e: except Exception as e:
log.error(f'Unable to get base58: {e}') log.error(f'Unable to get base58: {e}')
exit(2) exit(2)
log.debug("key.public_base58=%s" % self.public_base58) log.debug("key().public_base58=%s" % self.public_base58)
log.debug("key.secret_base58=%s" % self.secret_base58) log.debug("key().secret_base58=%s" % self.secret_base58)
def to_base64(self): def to_base64(self):
log.debug("key.to_base64()") log.debug("key().to_base64()")
try: try:
self.public_base64 = base64.b64encode(self.public_bytes).decode('ascii') self.public_base64 = base64.b64encode(self.public_bytes).decode('ascii')
self.secret_base64 = base64.b64encode(self.secret_bytes).decode('ascii') self.secret_base64 = base64.b64encode(self.secret_bytes).decode('ascii')
except Exception as e: except Exception as e:
log.error(f'Unable to get base64: {e}') log.error(f'Unable to get base64: {e}')
exit(2) exit(2)
log.debug("key.public_base64=%s" % self.public_base64) log.debug("key().public_base64=%s" % self.public_base64)
log.debug("key.secret_base64=%s" % self.secret_base64) log.debug("key().secret_base64=%s" % self.secret_base64)
def to_cidv1(self): def to_cidv1(self):
log.debug("key.to_cidv1()") log.debug("key().to_cidv1()")
if not hasattr(self, 'public_libp2p') or not hasattr(self, 'secret_libp2p'): if not hasattr(self, 'public_libp2p') or not hasattr(self, 'secret_libp2p'):
self.to_libp2p() self.to_libp2p()
try: try:
@ -342,12 +461,12 @@ class key():
except Exception as e: except Exception as e:
log.error(f'Unable to get cidv1: {e}') log.error(f'Unable to get cidv1: {e}')
exit(2) exit(2)
log.debug("key.public_cidv1=%s" % self.public_cidv1) log.debug("key().public_cidv1=%s" % self.public_cidv1)
log.debug("key.secret_cidv1=%s" % self.secret_cidv1) log.debug("key().secret_cidv1=%s" % self.secret_cidv1)
def to_duniterpy(self): def to_duniterpy(self):
log.debug("key.to_duniterpy()") log.debug("key().to_duniterpy()")
raise NotImplementedError(f"to_duniterpy() is not implemented for algorithm {self.algorithm}") raise NotImplementedError(f"key().to_duniterpy() is not implemented for algorithm {self.algorithm}")
def to_file(self, output_file, file_format=None, password=None): def to_file(self, output_file, file_format=None, password=None):
log.debug("key().to_file(%s, %s, %s)" % (output_file, file_format, password)) log.debug("key().to_file(%s, %s, %s)" % (output_file, file_format, password))
@ -418,7 +537,7 @@ class key():
exit(2) exit(2)
def to_jwk(self): def to_jwk(self):
log.debug("key.to_jwk()") log.debug("key().to_jwk()")
try: try:
if not hasattr(self, 'jwk'): if not hasattr(self, 'jwk'):
self.jwk = JWK.from_pyca(self.cryptography) self.jwk = JWK.from_pyca(self.cryptography)
@ -429,7 +548,7 @@ class key():
exit(2) exit(2)
def to_libp2p(self): def to_libp2p(self):
log.debug("key.to_libp2p()") log.debug("key().to_libp2p()")
try: try:
if not hasattr(self, 'public_proto2') or not hasattr(self, 'secret_proto2'): if not hasattr(self, 'public_proto2') or not hasattr(self, 'secret_proto2'):
self.to_proto2() self.to_proto2()
@ -440,11 +559,11 @@ class key():
except Exception as e: except Exception as e:
log.error(f'Unable to get libp2p: {e}') log.error(f'Unable to get libp2p: {e}')
exit(2) exit(2)
log.debug("key.public_libp2p=%s" % self.public_libp2p) log.debug("key().public_libp2p=%s" % self.public_libp2p)
log.debug("key.secret_libp2p=%s" % self.secret_libp2p) log.debug("key().secret_libp2p=%s" % self.secret_libp2p)
def to_pem_pkcs8(self): def to_pem_pkcs8(self):
log.debug("key.to_pem_pkcs8()") log.debug("key().to_pem_pkcs8()")
try: try:
self.secret_pem_pkcs8 = self.cryptography.private_bytes( self.secret_pem_pkcs8 = self.cryptography.private_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
@ -454,43 +573,9 @@ class key():
except Exception as e: except Exception as e:
log.error(f'Unable to get pem pkcs8: {e}') log.error(f'Unable to get pem pkcs8: {e}')
exit(2) exit(2)
log.debug("key.secret_pem_pkcs8=%s" % self.secret_pem_pkcs8) log.debug("key().secret_pem_pkcs8=%s" % self.secret_pem_pkcs8)
def to_pgpy(self):
log.debug("key.to_pgpy()")
try:
self.gpg_secret_keys = list(self.gpg.keylist(pattern=self.username, secret=True))
log.debug("key.gpg_secret_keys=%s" % self.gpg_secret_keys)
if not self.gpg_secret_keys:
log.warning(f"""Unable to find any key matching "{self.username}".""")
exit(1)
else:
self.gpg_secret_key = self.gpg_secret_keys[0]
log.info(f"""Found key id "{self.gpg_secret_key.fpr}" matching "{self.username}".""")
log.debug("key.gpg_secret_key.expired=%s" % self.gpg_secret_key.expired)
log.debug("key.gpg_secret_key.fpr=%s" % self.gpg_secret_key.fpr)
log.debug("key.gpg_secret_key.revoked=%s" % self.gpg_secret_key.revoked)
log.debug("key.gpg_secret_key.uids=%s" % self.gpg_secret_key.uids)
log.debug("key.gpg_secret_key.owner_trust=%s" % self.gpg_secret_key.owner_trust)
log.debug("key.gpg_secret_key.last_update=%s" % self.gpg_secret_key.last_update)
if self.password:
self.gpg.set_pinentry_mode(gpg.constants.PINENTRY_MODE_LOOPBACK)
self.pgp_public_armor = self.gpg.key_export(self.gpg_secret_key.fpr)
self.pgp_secret_armor = self.gpg.key_export_secret(self.gpg_secret_key.fpr)
log.debug("key.pgp_secret_armor=%s" % self.pgp_secret_armor)
if not self.pgp_secret_armor:
log.error(f"""Unable to export gpg secret key id "{self.gpg_secret_key.fpr}" of user "{self.username}". Please check your password!""")
exit(2)
with warnings.catch_warnings():
# remove CryptographyDeprecationWarning about deprecated
# SymmetricKeyAlgorithm IDEA, CAST5 and Blowfish (PGPy v0.5.4)
warnings.simplefilter('ignore')
self.pgpy, _ = pgpy.PGPKey.from_blob(self.pgp_secret_armor)
except Exception as e:
log.error(f'Unable to get pgpy from gpg: {e}')
exit(2)
def to_proto2(self): def to_proto2(self):
log.debug("key.to_proto2()") log.debug("key().to_proto2()")
raise NotImplementedError(f"to_proto2() is not implemented for algorithm {self.algorithm}") raise NotImplementedError(f"key().to_proto2() is not implemented for algorithm {self.algorithm}")

View File

@ -53,7 +53,7 @@ def from_libp2p(libp2p):
exit(2) exit(2)
def from_pem(pem): def from_pem(pem):
log.debug("ed25519.from_pem()") log.debug("ed25519.from_pem(%s)" % pem)
try: try:
return ed25519(serialization.load_pem_private_key(pem, password=None).private_bytes( return ed25519(serialization.load_pem_private_key(pem, password=None).private_bytes(
encoding=serialization.Encoding.Raw, encoding=serialization.Encoding.Raw,
@ -64,52 +64,34 @@ def from_pem(pem):
log.error(f'Unable to get ed25519 from pem: {e}') log.error(f'Unable to get ed25519 from pem: {e}')
exit(2) exit(2)
def from_pgpy(pgpy): def from_pgpy(_pgpy):
log.debug("ed25519.from_pgpy()") log.debug("ed25519.from_pgpy(%s)" % _pgpy)
try: try:
self.pgpy_key_type() if _pgpy.key_type == 'RSA':
if self.pgpy_key_type == 'RSA': log.debug("ed25519._pgpy._key.keymaterial.p=%s" % _pgpy._key.keymaterial.p)
log.debug("key.pgpy._key.keymaterial.p=%s" % self.pgpy._key.keymaterial.p) log.debug("ed25519._pgpy._key.keymaterial.q=%s" % _pgpy._key.keymaterial.q)
log.debug("key.pgpy._key.keymaterial.q=%s" % self.pgpy._key.keymaterial.q)
# rsa custom seed: sha256 hash of (p + q), where + is a string concatenation # rsa custom seed: sha256 hash of (p + q), where + is a string concatenation
# self.ed25519_seed_bytes = nacl.bindings.crypto_hash_sha256((rsa_int).to_bytes(rsa_len,byteorder='big')) # self.ed25519_seed_bytes = nacl.bindings.crypto_hash_sha256((rsa_int).to_bytes(rsa_len,byteorder='big'))
rsa_int = int(str(self.pgpy._key.keymaterial.p) + str(self.pgpy._key.keymaterial.q)) rsa_int = int(str(_pgpy._key.keymaterial.p) + str(_pgpy._key.keymaterial.q))
rsa_len = (rsa_int.bit_length() + 7) // 8 rsa_len = (rsa_int.bit_length() + 7) // 8
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import hashes
digest = hashes.Hash(hashes.SHA256()) digest = hashes.Hash(hashes.SHA256())
digest.update((rsa_int).to_bytes(rsa_len,byteorder='big')) digest.update((rsa_int).to_bytes(rsa_len,byteorder='big'))
seed_bytes = digest.finalize() seed = digest.finalize()
# seed_bytes = nacl.bindings.crypto_hash_sha256((rsa_int).to_bytes(rsa_len,byteorder='big')) # seed_bytes = nacl.bindings.crypto_hash_sha256((rsa_int).to_bytes(rsa_len,byteorder='big'))
elif self.pgpy_key_type in ('ECDSA', 'EdDSA', 'ECDH'): elif _pgpy.key_type in ('ECDSA', 'EdDSA', 'ECDH'):
log.debug("key.pgpy._key.keymaterial.s=%s" % self.pgpy._key.keymaterial.s) log.debug("ed25519._pgpy._key.keymaterial.s=%s" % _pgpy._key.keymaterial.s)
seed_bytes = self.pgpy._key.keymaterial.s.to_bytes(32, byteorder='big') seed = _pgpy._key.keymaterial.s.to_bytes(32, byteorder='big')
else: else:
raise NotImplementedError(f"getting seed from pgp key type {self.pgpy_key_type} is not implemented") raise NotImplementedError(f"getting seed from pgpy key type {_pgpy.key_type} is not implemented.")
return ed25519(seed_bytes) return ed25519(seed)
except Exception as e: except Exception as e:
log.error(f'Unable to get ed25519 from pgpy: {e}') log.error(f'Unable to get ed25519 from pgpy: {e}')
exit(2) exit(2)
def pgpy_key_type(self):
log.debug("keygen.pgpy_key_type()")
if isinstance(self.pgpy._key.keymaterial, pgpy.packet.fields.RSAPriv):
self.pgpy_key_type = 'RSA'
elif isinstance(self.pgpy._key.keymaterial, pgpy.packet.fields.DSAPriv):
self.pgpy_key_type = 'DSA'
elif isinstance(self.pgpy._key.keymaterial, pgpy.packet.fields.ElGPriv):
self.pgpy_key_type = 'ElGamal'
elif isinstance(self.pgpy._key.keymaterial, pgpy.packet.fields.ECDSAPriv):
self.pgpy_key_type = 'ECDSA'
elif isinstance(self.pgpy._key.keymaterial, pgpy.packet.fields.EdDSAPriv):
self.pgpy_key_type = 'EdDSA'
elif isinstance(self.pgpy._key.keymaterial, pgpy.packet.fields.ECDHPriv):
self.pgpy_key_type = 'ECDH'
else:
self.pgpy_key_type = 'undefined'
log.debug("keygen.pgpy_key_type=%s" % self.pgpy_key_type)
class ed25519(key): class ed25519(key):
def __init__(self, seed: bytes): def __init__(self, seed: bytes):
log.debug("ed25519().__init__(%s)" % seed)
super().__init__() super().__init__()
self.algorithm = 'ed25519' self.algorithm = 'ed25519'
self.cryptography = Ed25519PrivateKey.from_private_bytes(seed) self.cryptography = Ed25519PrivateKey.from_private_bytes(seed)
@ -119,89 +101,29 @@ class ed25519(key):
) )
self.secret_bytes = seed + self.public_bytes self.secret_bytes = seed + self.public_bytes
self.seed_bytes = seed self.seed_bytes = seed
log.debug("ed25519().seed_bytes=%s" % self.seed_bytes)
def _cleanup(self): def _cleanup(self):
log.debug("ed25519()._cleanup()") log.debug("ed25519()._cleanup()")
if hasattr(self, 'duniterpy'):
if hasattr(self.duniterpy, 'seed') and self.duniterpy.seed:
clearmem(self.duniterpy.seed)
log.debug("cleared: ed25519().duniterpy.seed")
if hasattr(self.duniterpy, 'sk') and self.duniterpy.sk:
clearmem(self.duniterpy.sk)
log.debug("cleared: ed25519().duniterpy.sk")
if hasattr(self, 'secret_b36mf') and self.secret_b36mf:
clearmem(self.secret_b36mf)
log.debug("cleared: ed25519().secret_b36mf")
if hasattr(self, 'secret_b58mf') and self.secret_b58mf:
clearmem(self.secret_b58mf)
log.debug("cleared: ed25519().secret_b58mf")
if hasattr(self, 'secret_b58mh') and self.secret_b58mh:
clearmem(self.secret_b58mh)
log.debug("cleared: ed25519().secret_b58mh")
if hasattr(self, 'secret_b64mh') and self.secret_b64mh:
clearmem(self.secret_b64mh)
log.debug("cleared: ed25519().secret_b64mh")
if hasattr(self, 'secret_base58') and self.secret_base58:
clearmem(self.secret_base58)
log.debug("cleared: ed25519().secret_base58")
if hasattr(self, 'secret_base64') and self.secret_base64:
clearmem(self.secret_base64)
log.debug("cleared: ed25519().secret_base64")
if hasattr(self, 'secret_bytes') and self.secret_bytes: if hasattr(self, 'secret_bytes') and self.secret_bytes:
clearmem(self.secret_bytes) clearmem(self.secret_bytes)
log.debug("cleared: ed25519().secret_bytes") log.debug("cleared: ed25519().secret_bytes")
if hasattr(self, 'secret_cidv1') and self.secret_cidv1:
clearmem(self.secret_cidv1)
log.debug("cleared: ed25519().secret_cidv1")
if hasattr(self, 'secret_libp2p') and self.secret_libp2p:
clearmem(self.secret_libp2p)
log.debug("cleared: ed25519().secret_libp2p")
if hasattr(self, 'secret_pem_pkcs8') and self.secret_pem_pkcs8:
clearmem(self.secret_pem_pkcs8)
log.debug("cleared: ed25519().secret_pem_pkcs8")
if hasattr(self, 'secret_proto2') and self.secret_proto2:
clearmem(self.secret_proto2)
log.debug("cleared: ed25519().secret_proto2")
if hasattr(self, 'seed_bytes') and self.seed_bytes: if hasattr(self, 'seed_bytes') and self.seed_bytes:
clearmem(self.seed_bytes) clearmem(self.seed_bytes)
log.debug("cleared: ed25519().seed_bytes") log.debug("cleared: ed25519().seed_bytes")
if hasattr(self, 'ipfs_privkey') and self.ipfs_privkey: super()._cleanup()
clearmem(self.ipfs_privkey)
log.debug("cleared: ed25519().ipfs_privkey")
if hasattr(self, 'jwk'):
if hasattr(self, 'secret_jwk') and self.secret_jwk:
clearmem(self.secret_jwk)
log.debug("cleared: ed25519().secret_jwk")
if hasattr(self.jwk, 'd') and self.jwk.d:
clearmem(self.jwk.d)
log.debug("cleared: ed25519().jwk.d")
if hasattr(self, 'pgp_secret_armor') and self.pgp_secret_armor:
clearmem(self.pgp_secret_armor)
log.debug("cleared: ed25519().pgp_secret_armor")
if hasattr(self, 'pgpy'):
if hasattr(self.pgpy._key.keymaterial, 'p') and self.pgpy._key.keymaterial.p and not isinstance(self.pgpy._key.keymaterial.p, pgpy.packet.fields.ECPoint):
clearmem(self.pgpy._key.keymaterial.p)
log.debug("cleared: ed25519().pgpy._key.material.p")
if hasattr(self.pgpy._key.keymaterial, 'q') and self.pgpy._key.keymaterial.q:
clearmem(self.pgpy._key.keymaterial.q)
log.debug("cleared: ed25519().pgpy._key.material.q")
if hasattr(self.pgpy._key.keymaterial, 's') and self.pgpy._key.keymaterial.s:
clearmem(self.pgpy._key.keymaterial.s)
log.debug("cleared: ed25519().pgpy._key.material.s")
def to_duniterpy(self): def to_duniterpy(self):
log.debug("keygen.to_duniterpy()") log.debug("ed25519().to_duniterpy()")
try: try:
if not hasattr(self, 'duniterpy'): if not hasattr(self, 'duniterpy'):
self.duniterpy = duniterpy.key.SigningKey(self.seed_bytes) self.duniterpy = duniterpy.key.SigningKey(self.seed_bytes)
except Exception as e: except Exception as e:
log.error(f'Unable to get duniterpy: {e}') log.error(f'Unable to get duniterpy: {e}')
exit(2) exit(2)
log.debug("keygen.duniterpy.seed: %s" % self.duniterpy.seed) log.debug("ed25519().duniterpy.seed: %s" % self.duniterpy.seed)
def to_proto2(self): def to_proto2(self):
log.debug("key.to_proto2()") log.debug("ed25519().to_proto2()")
try: try:
## libp2p Protocol Buffer serialization ## libp2p Protocol Buffer serialization
self.public_proto2 = b'\x08\x01\x12 ' + self.public_bytes self.public_proto2 = b'\x08\x01\x12 ' + self.public_bytes
@ -209,6 +131,6 @@ class ed25519(key):
except Exception as e: except Exception as e:
log.error(f'Unable to get proto2: {e}') log.error(f'Unable to get proto2: {e}')
exit(2) exit(2)
log.debug("key.public_proto2=%s" % self.public_proto2) log.debug("ed25519().public_proto2=%s" % self.public_proto2)
log.debug("key.secret_proto2=%s" % self.secret_proto2) log.debug("ed25519().secret_proto2=%s" % self.secret_proto2)

30
keygen
View File

@ -123,7 +123,7 @@ class keygen:
) )
def _check_args(self, args): def _check_args(self, args):
log.debug("keygen._check_args(%s)" % args) log.debug("keygen()._check_args(%s)" % args)
if self.input is None and not len(self.username): if self.input is None and not len(self.username):
self.parser.error('keygen requires an input file or a username') self.parser.error('keygen requires an input file or a username')
@ -148,7 +148,7 @@ class keygen:
else: else:
log_level='WARNING' log_level='WARNING'
log.basicConfig(format=log_format, datefmt=log_datefmt, level=log_level) log.basicConfig(format=log_format, datefmt=log_datefmt, level=log_level)
log.debug("keygen._cli(%s)" % argv) log.debug("keygen()._cli(%s)" % argv)
self._check_args(args) self._check_args(args)
self._load_config() self._load_config()
@ -157,18 +157,18 @@ class keygen:
return method() return method()
def _invalid_type(self): def _invalid_type(self):
log.debug("keygen._invalid_type()") log.debug("keygen()._invalid_type()")
self.parser.error(f"type {self.type} is not valid.") self.parser.error(f"type {self.type} is not valid.")
def _load_config(self): def _load_config(self):
log.debug("keygen._load_config()") log.debug("keygen()._load_config()")
self.config = configparser.RawConfigParser() self.config = configparser.RawConfigParser()
config_dir = os.path.join(os.environ.get('XDG_CONFIG_HOME', os.path.expanduser('~/.config')), 'dpgpid') config_dir = os.path.join(os.environ.get('XDG_CONFIG_HOME', os.path.expanduser('~/.config')), 'dpgpid')
log.debug("config_dir=%s" % config_dir) log.debug("config_dir=%s" % config_dir)
self.config.read( [config_dir + '/keygen.conf'] ) self.config.read( [config_dir + '/keygen.conf'] )
def _output(self): def _output(self):
log.debug("keygen._output()") log.debug("keygen()._output()")
if self.output is None: if self.output is None:
self._output_text() self._output_text()
else: else:
@ -176,7 +176,7 @@ class keygen:
os.chmod(self.output, 0o600) os.chmod(self.output, 0o600)
def _output_file(self): def _output_file(self):
log.debug("keygen._output_file()") log.debug("keygen()._output_file()")
self.key.to_file(self.output, self.format, self.password) self.key.to_file(self.output, self.format, self.password)
def _output_text(self): def _output_text(self):
@ -187,49 +187,49 @@ class keygen:
print("%s" % self.secret_key) print("%s" % self.secret_key)
def do_b36mf(self): def do_b36mf(self):
log.debug("keygen.do_b36mf()") log.debug("keygen().do_b36mf()")
self.key.to_b36mf() self.key.to_b36mf()
self.public_key = self.key.public_b36mf self.public_key = self.key.public_b36mf
self.secret_key = self.key.secret_b36mf self.secret_key = self.key.secret_b36mf
self._output() self._output()
def do_b58mf(self): def do_b58mf(self):
log.debug("keygen.do_b58mf()") log.debug("keygen().do_b58mf()")
self.key.to_b58mf() self.key.to_b58mf()
self.public_key = self.key.public_b58mf self.public_key = self.key.public_b58mf
self.secret_key = self.key.secret_b58mf self.secret_key = self.key.secret_b58mf
self._output() self._output()
def do_b58mh(self): def do_b58mh(self):
log.debug("keygen.do_b58mh()") log.debug("keygen().do_b58mh()")
self.key.to_b58mh() self.key.to_b58mh()
self.public_key = self.key.public_b58mh self.public_key = self.key.public_b58mh
self.secret_key = self.key.secret_b58mh self.secret_key = self.key.secret_b58mh
self._output() self._output()
def do_b64mh(self): def do_b64mh(self):
log.debug("keygen.do_b64mh()") log.debug("keygen().do_b64mh()")
self.key.to_b64mh() self.key.to_b64mh()
self.public_key = self.key.public_b64mh self.public_key = self.key.public_b64mh
self.secret_key = self.key.secret_b64mh self.secret_key = self.key.secret_b64mh
self._output() self._output()
def do_base58(self): def do_base58(self):
log.debug("keygen.do_base58()") log.debug("keygen().do_base58()")
self.key.to_base58() self.key.to_base58()
self.public_key = self.key.public_base58 self.public_key = self.key.public_base58
self.secret_key = self.key.secret_base58 self.secret_key = self.key.secret_base58
self._output() self._output()
def do_base64(self): def do_base64(self):
log.debug("keygen.do_base64()") log.debug("keygen().do_base64()")
self.key.to_base64() self.key.to_base64()
self.public_key = self.key.public_base64 self.public_key = self.key.public_base64
self.secret_key = self.key.secret_base64 self.secret_key = self.key.secret_base64
self._output() self._output()
def do_duniter(self): def do_duniter(self):
log.debug("keygen.do_duniter()") log.debug("keygen().do_duniter()")
if not self.format: if not self.format:
self.format = 'pubsec' self.format = 'pubsec'
self.key.to_base58() self.key.to_base58()
@ -238,7 +238,7 @@ class keygen:
self._output() self._output()
def do_ipfs(self): def do_ipfs(self):
log.debug("keygen.do_ipfs()") log.debug("keygen().do_ipfs()")
self.key.to_b58mh() self.key.to_b58mh()
self.key.to_b64mh() self.key.to_b64mh()
self.public_key = self.key.public_b58mh self.public_key = self.key.public_b58mh
@ -246,7 +246,7 @@ class keygen:
self._output() self._output()
def do_jwk(self): def do_jwk(self):
log.debug("keygen.do_jwk()") log.debug("keygen().do_jwk()")
self.key.to_jwk() self.key.to_jwk()
self.public_key = self.key.public_jwk self.public_key = self.key.public_jwk
self.secret_key = self.key.secret_jwk self.secret_key = self.key.secret_jwk