#!/usr/bin/env python3 # link: https://git.p2p.legal/aya/dpgpid/ # desc: generate did:ipid keys from gpg # Copyleft 2022 Yann Autissier # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import argparse import configparser import gpg import json import logging as log from SecureBytes import clearmem import os import re import struct import sys import time import warnings sys.path.append(os.path.abspath('../py-ipfs-http-client/')) import ipfshttpclient import key __version__='0.1.0' class dpgpid: def __init__(self): self.parser = argparse.ArgumentParser(description=""" Generate did:ipid keys from gpg. It converts a gpg key to a Decentralized IDentifier.""") self.parser.add_argument( 'action', help="{list,publish}", nargs="?", ) self.parser.add_argument( "-d", "--debug", action="store_true", help="show debug informations", ) self.parser.add_argument( "-q", "--quiet", action="store_true", help="show only errors", ) self.parser.add_argument( "-t", "--type", choices=['dns', 'ipid','key'], default="ipid", dest="type", help="did output format, default: ipid", ) self.parser.add_argument( "-v", "--verbose", action="store_true", help="show more informations", ) self.parser.add_argument( "--version", action="store_true", help="show version and exit", ) self.parser.add_argument( '-u', '--username', dest="username", ) self.parser.add_argument( '-p', '--password', dest="password", ) def _check_args(self, args): log.debug("dpgpid._check_args(%s)" % args) if self.action not in ['list', 'publish']: self.parser.error('dpgpid requires a valid action') def _cleanup(self): log.debug("dpgpid._cleanup()") self.ipfs.close() def _cli(self, argv): args = self.parser.parse_args(argv) vars(self).update(vars(args)) # display version if args.version: version() sys.exit() # define log format log_format='%(asctime)s %(levelname)s: %(message)s' log_datefmt='%Y/%m/%d %H:%M:%S' if args.debug: log_level='DEBUG' elif args.quiet: log_level='ERROR' elif args.verbose: log_level='INFO' else: log_level='WARNING' log.basicConfig(format=log_format, datefmt=log_datefmt, level=log_level) self._check_args(args) self._load_config() self.gpg = gpg.Context(armor=True, offline=True) try: self.ipfs = ipfshttpclient.connect('/ip4/172.18.0.2/tcp/5001/',session=True,timeout=3) except Exception as e: log.error('Unable to load ipfs client: %s' % e) exit(2) method = getattr(self, f'do_{self.type}', self._invalid_type) return method() def _invalid_type(self): log.debug("dpgpid._invalid_type()") self.parser.error(f"type {self.type} is not valid.") def _load_config(self): log.debug("dpgpid._load_config()") self.config = configparser.RawConfigParser() config_dir = os.path.join(os.environ.get('XDG_CONFIG_HOME', os.path.expanduser('~/.config')), 'dpgpid') log.debug("config_dir=%s" % config_dir) self.config.read( [config_dir + '/dpgpid.conf'] ) def did_from_key(self): log.debug("dpgpid.did_from_key()") self.did = json.dumps({ "@context": "/ipfs/QmfS56jDfrXNaS6Xcsp3RJiXd2wyY7smeEAwyTAnL1RhEG", "id": "did:ipid:" + self.key_id, "created": self.key_created_at, "publicKey": [{ "id": "did:ipid:" + self.key_id, "type": "GpgVerificationKey2020", "expires": self.key_expires_at, "publicKeyGpg": self.key_public_key, }, { "id": "did:ipid:" + self.key_id, "type": "JsonWebKey2020", "expires": self.key_expires_at, "publicKeyJwk": self.key_public_jwk, }], }) if self.key_updated_at: self.did.update({"updated": self.key_updated_at,}) log.debug("dpgpid.did=%s" % self.did) def do_list(self): log.debug("dpgpid.do_list()") gpgkeys = list(self.gpg.keylist(pattern=self.username, secret=True)) for gpgkey in gpgkeys: self.key_from_gpg(gpgkey.fpr, self.password) print("did:ipid:%s" % self.key_id) self._cleanup() def do_publish(self): log.debug("dpgpid.do_publish()") gpgkeys = list(self.gpg.keylist(pattern=self.username, secret=True)) if not gpgkeys: log.warning(f"""Unable to find any key matching "{self.username}".""") exit(1) else: gpgkey = gpgkeys[0] log.info(f"""Found key id "{gpgkey.fpr}" matching "{self.username}".""") self.key_from_gpg(gpgkey.fpr, self.password) self.did_from_key() self.did_cid = self.ipfs.add_json(self.did) log.debug('dpgpid.did_cid=%s' % self.did_cid) self.did_ipns = self.ipfs.name.publish(ipfs_path='/ipfs/' + self.did_cid, key=self.key_id)['Name'] log.debug('dpgpid.did_ipns=%s' % self.did_ipns) self._cleanup() def do_key(self): log.debug("dpgpid.do_key()") gpgkeys = list(self.gpg.keylist(pattern=self.username, secret=True)) if not gpgkeys: log.warning(f"""Unable to find any key matching "{self.username}".""") exit(1) else: gpgkey = gpgkeys[0] log.info(f"""Found key id "{gpgkey.fpr}" matching "{self.username}".""") self.key_from_gpg(gpgkey.fpr, self.password) self.did_from_key() self.did_cid = self.ipfs.add_json(self.did) log.debug('dpgpid.did_cid=%s' % self.did_cid) self.did_ipns = self.ipfs.name.publish(ipfs_path='/ipfs/' + self.did_cid, key=self.key_id)['Name'] log.debug('dpgpid.did_ipns=%s' % self.did_ipns) didkit.keyToDID('key', self.key_secret_jwk) self._cleanup() def do_show(self): log.debug("dpgpid.do_show()") gpgkeys = list(self.gpg.keylist(pattern=self.username, secret=True)) if not gpgkeys: log.warning(f"""Unable to find any key matching "{self.username}".""") exit(1) else: gpgkey = gpgkeys[0] log.info(f"""Found key id "{gpgkey.fpr}" matching "{self.username}".""") self.key_from_gpg(gpgkey.fpr, self.password) self.did_from_key() print(self.did) self._cleanup() def ipfs_peerid(self, args): log.debug("dpgpid.ipfs_peerid(%s)" % args) try: self.ipfs_peerid = self.ipfs.id()['ID'] except Exception as e: log.error(f'Unable to get ipfs peer id: {e}') exit() log.debug('dpgpid.ipfs_peerid=%s' % self.ipfs_peerid) def key_from_gpg(self, username, password): log.debug("dpgpid.key_from_gpg(%s, %s)" % (username, password)) try: key = keygen.keygen() key.gpg = gpg.Context(armor=True, offline=True) key.gpg.set_passphrase_cb(key.gpg_passphrase_cb) key.username = username key.password = password key.ed25519_from_gpg() key.pem_pkcs8_from_ed25519() key.libp2p_from_ed25519() key.b58mh_from_libp2p() key.jwk_from_ed25519() self.key_created_at = str(key.pgpy.created) self.key_expires_at = str(key.pgpy.expires_at) self.key_fingerprint = key.gpg_secret_key.fpr self.key_id = key.ed25519_public_b58mh self.key_is_expired = key.gpg_secret_key.expired self.key_is_revoked = key.gpg_secret_key.revoked self.key_public_key = str(key.pgpy.pubkey) self.key_public_jwk = key.ed25519_public_jwk self.key_secret_jwk = key.ed25519_secret_jwk self.key_secret_pem = key.ed25519_secret_pem_pkcs8 self.key_signers = key.pgpy.signers self.key_uids = key.gpg_secret_key.uids self.key_updated_at = key.gpg_secret_key.last_update log.debug("dpgpid.key.created_at=%s" % self.key_created_at) log.debug("dpgpid.key.expires_at=%s" % self.key_expires_at) log.debug("dpgpid.key.fingerprint=%s" % self.key_fingerprint) log.debug("dpgpid.key.id=%s" % self.key_id) log.debug("dpgpid.key.is_expired=%s" % self.key_is_expired) log.debug("dpgpid.key.is_revoked=%s" % self.key_is_revoked) log.debug("dpgpid.key.public_key=%s" % self.key_public_key) log.debug("dpgpid.key.public_jwk=%s" % self.key_public_jwk) log.debug("dpgpid.key.secret_jwk=%s" % self.key_secret_jwk) log.debug("dpgpid.key.signers=%s" % self.key_signers) log.debug("dpgpid.key.uids=%s" % self.key_uids) log.debug("dpgpid.key.updated_at=%s" % self.key_updated_at) except Exception as e: log.error(f'Unable to get key from gpg: {e}') exit(2) def main(argv=None): if argv is None: argv = sys.argv[1:] return dpgpid()._cli(argv) def version(version=__version__): print("%s v%s" % (sys.argv[0],version)) if __name__ == "__main__": sys.exit(main())