dpgpid/dpgpid

285 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
# link: https://git.p2p.legal/aya/dpgpid/
# desc: generate did:ipid keys from gpg
# Copyleft 2022 Yann Autissier <aya@asycn.io>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import argparse
import configparser
import gpg
import json
import logging as log
from SecureBytes import clearmem
import os
import re
import struct
import sys
import time
import warnings
sys.path.append(os.path.abspath('../py-ipfs-http-client/'))
import ipfshttpclient
import key
__version__='0.1.0'
class dpgpid:
def __init__(self):
self.parser = argparse.ArgumentParser(description="""
Generate did:ipid keys from gpg.
It converts a gpg key to a Decentralized IDentifier.""")
self.parser.add_argument(
'action',
help="{list,publish}",
nargs="?",
)
self.parser.add_argument(
"-d",
"--debug",
action="store_true",
help="show debug informations",
)
self.parser.add_argument(
"-q",
"--quiet",
action="store_true",
help="show only errors",
)
self.parser.add_argument(
"-t",
"--type",
choices=['dns', 'ipid','key'],
default="ipid",
dest="type",
help="did output format, default: ipid",
)
self.parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="show more informations",
)
self.parser.add_argument(
"--version",
action="store_true",
help="show version and exit",
)
self.parser.add_argument(
'-u',
'--username',
dest="username",
)
self.parser.add_argument(
'-p',
'--password',
dest="password",
)
def _check_args(self, args):
log.debug("dpgpid._check_args(%s)" % args)
if self.action not in ['list', 'publish']:
self.parser.error('dpgpid requires a valid action')
def _cleanup(self):
log.debug("dpgpid._cleanup()")
self.ipfs.close()
def _cli(self, argv):
args = self.parser.parse_args(argv)
vars(self).update(vars(args))
# display version
if args.version:
version()
sys.exit()
# define log format
log_format='%(asctime)s %(levelname)s: %(message)s'
log_datefmt='%Y/%m/%d %H:%M:%S'
if args.debug:
log_level='DEBUG'
elif args.quiet:
log_level='ERROR'
elif args.verbose:
log_level='INFO'
else:
log_level='WARNING'
log.basicConfig(format=log_format, datefmt=log_datefmt, level=log_level)
self._check_args(args)
self._load_config()
self.gpg = gpg.Context(armor=True, offline=True)
try:
self.ipfs = ipfshttpclient.connect('/ip4/172.18.0.2/tcp/5001/',session=True,timeout=3)
except Exception as e:
log.error('Unable to load ipfs client: %s' % e)
exit(2)
method = getattr(self, f'do_{self.type}', self._invalid_type)
return method()
def _invalid_type(self):
log.debug("dpgpid._invalid_type()")
self.parser.error(f"type {self.type} is not valid.")
def _load_config(self):
log.debug("dpgpid._load_config()")
self.config = configparser.RawConfigParser()
config_dir = os.path.join(os.environ.get('XDG_CONFIG_HOME', os.path.expanduser('~/.config')), 'dpgpid')
log.debug("config_dir=%s" % config_dir)
self.config.read( [config_dir + '/dpgpid.conf'] )
def did_from_key(self):
log.debug("dpgpid.did_from_key()")
self.did = json.dumps({
"@context": "/ipfs/QmfS56jDfrXNaS6Xcsp3RJiXd2wyY7smeEAwyTAnL1RhEG",
"id": "did:ipid:" + self.key_id,
"created": self.key_created_at,
"publicKey": [{
"id": "did:ipid:" + self.key_id,
"type": "GpgVerificationKey2020",
"expires": self.key_expires_at,
"publicKeyGpg": self.key_public_key,
}, {
"id": "did:ipid:" + self.key_id,
"type": "JsonWebKey2020",
"expires": self.key_expires_at,
"publicKeyJwk": self.key_public_jwk,
}],
})
if self.key_updated_at:
self.did.update({"updated": self.key_updated_at,})
log.debug("dpgpid.did=%s" % self.did)
def do_list(self):
log.debug("dpgpid.do_list()")
gpgkeys = list(self.gpg.keylist(pattern=self.username, secret=True))
for gpgkey in gpgkeys:
self.key_from_gpg(gpgkey.fpr, self.password)
print("did:ipid:%s" % self.key_id)
self._cleanup()
def do_publish(self):
log.debug("dpgpid.do_publish()")
gpgkeys = list(self.gpg.keylist(pattern=self.username, secret=True))
if not gpgkeys:
log.warning(f"""Unable to find any key matching "{self.username}".""")
exit(1)
else:
gpgkey = gpgkeys[0]
log.info(f"""Found key id "{gpgkey.fpr}" matching "{self.username}".""")
self.key_from_gpg(gpgkey.fpr, self.password)
self.did_from_key()
self.did_cid = self.ipfs.add_json(self.did)
log.debug('dpgpid.did_cid=%s' % self.did_cid)
self.did_ipns = self.ipfs.name.publish(ipfs_path='/ipfs/' + self.did_cid, key=self.key_id)['Name']
log.debug('dpgpid.did_ipns=%s' % self.did_ipns)
self._cleanup()
def do_key(self):
log.debug("dpgpid.do_key()")
gpgkeys = list(self.gpg.keylist(pattern=self.username, secret=True))
if not gpgkeys:
log.warning(f"""Unable to find any key matching "{self.username}".""")
exit(1)
else:
gpgkey = gpgkeys[0]
log.info(f"""Found key id "{gpgkey.fpr}" matching "{self.username}".""")
self.key_from_gpg(gpgkey.fpr, self.password)
self.did_from_key()
self.did_cid = self.ipfs.add_json(self.did)
log.debug('dpgpid.did_cid=%s' % self.did_cid)
self.did_ipns = self.ipfs.name.publish(ipfs_path='/ipfs/' + self.did_cid, key=self.key_id)['Name']
log.debug('dpgpid.did_ipns=%s' % self.did_ipns)
didkit.keyToDID('key', self.key_secret_jwk)
self._cleanup()
def do_show(self):
log.debug("dpgpid.do_show()")
gpgkeys = list(self.gpg.keylist(pattern=self.username, secret=True))
if not gpgkeys:
log.warning(f"""Unable to find any key matching "{self.username}".""")
exit(1)
else:
gpgkey = gpgkeys[0]
log.info(f"""Found key id "{gpgkey.fpr}" matching "{self.username}".""")
self.key_from_gpg(gpgkey.fpr, self.password)
self.did_from_key()
print(self.did)
self._cleanup()
def ipfs_peerid(self, args):
log.debug("dpgpid.ipfs_peerid(%s)" % args)
try:
self.ipfs_peerid = self.ipfs.id()['ID']
except Exception as e:
log.error(f'Unable to get ipfs peer id: {e}')
exit()
log.debug('dpgpid.ipfs_peerid=%s' % self.ipfs_peerid)
def key_from_gpg(self, username, password):
log.debug("dpgpid.key_from_gpg(%s, %s)" % (username, password))
try:
key = keygen.keygen()
key.gpg = gpg.Context(armor=True, offline=True)
key.gpg.set_passphrase_cb(key.gpg_passphrase_cb)
key.username = username
key.password = password
key.ed25519_from_gpg()
key.pem_pkcs8_from_ed25519()
key.libp2p_from_ed25519()
key.b58mh_from_libp2p()
key.jwk_from_ed25519()
self.key_created_at = str(key.pgpy.created)
self.key_expires_at = str(key.pgpy.expires_at)
self.key_fingerprint = key.gpg_secret_key.fpr
self.key_id = key.ed25519_public_b58mh
self.key_is_expired = key.gpg_secret_key.expired
self.key_is_revoked = key.gpg_secret_key.revoked
self.key_public_key = str(key.pgpy.pubkey)
self.key_public_jwk = key.ed25519_public_jwk
self.key_secret_jwk = key.ed25519_secret_jwk
self.key_secret_pem = key.ed25519_secret_pem_pkcs8
self.key_signers = key.pgpy.signers
self.key_uids = key.gpg_secret_key.uids
self.key_updated_at = key.gpg_secret_key.last_update
log.debug("dpgpid.key.created_at=%s" % self.key_created_at)
log.debug("dpgpid.key.expires_at=%s" % self.key_expires_at)
log.debug("dpgpid.key.fingerprint=%s" % self.key_fingerprint)
log.debug("dpgpid.key.id=%s" % self.key_id)
log.debug("dpgpid.key.is_expired=%s" % self.key_is_expired)
log.debug("dpgpid.key.is_revoked=%s" % self.key_is_revoked)
log.debug("dpgpid.key.public_key=%s" % self.key_public_key)
log.debug("dpgpid.key.public_jwk=%s" % self.key_public_jwk)
log.debug("dpgpid.key.secret_jwk=%s" % self.key_secret_jwk)
log.debug("dpgpid.key.signers=%s" % self.key_signers)
log.debug("dpgpid.key.uids=%s" % self.key_uids)
log.debug("dpgpid.key.updated_at=%s" % self.key_updated_at)
except Exception as e:
log.error(f'Unable to get key from gpg: {e}')
exit(2)
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
return dpgpid()._cli(argv)
def version(version=__version__):
print("%s v%s" % (sys.argv[0],version))
if __name__ == "__main__":
sys.exit(main())