Add cesium-messaging code

This commit is contained in:
qo-op 2020-11-30 16:03:47 +01:00
parent 724b85dbeb
commit 8f6fb7e161
16 changed files with 1140 additions and 0 deletions

View File

@ -0,0 +1,3 @@
DUNIKEY²="/home/fred/.ssb/trousseau-2L8vaYix-Fred-gchange-PubSec.dunikey" # Chemin du fichier de trousseau Ḡ1 de l'émetteur, au format PubSec
#POD="https://g1.data.duniter.fr" # Noeud Cecium+ utilisé pour l'envoi du message
POD="https://data.gchange.fr" # Noeud Gchange utilisé pour l'envoi du message

View File

@ -0,0 +1,4 @@
DUNIKEY="" # Chemin de la clé privé Ḡ1 de l'émetteur, au format PubSec
#POD="https://g1.data.duniter.fr" # Adresse du pod Cesium ou Gchange à utiliser
POD="https://g1.data.le-sou.org" # Adresse du pod Cesium de secours
#POD="https://data.gchange.fr" # Adresse du pod ḠChange à utiliser

View File

@ -0,0 +1,62 @@
# Utilisation de la messagerie Cesium+/Gchange
## Réception/Envoi/Suppression de messages
## Installation
Linux:
```
bash setup.sh
```
Autre:
```
Débrouillez-vous.
```
## Utilisation
Renseignez le fichier **.env** (Généré lors de la première tentative d'execution, ou à copier depuis .env.template).
### Lecture des messages
```
./dialog.py read
```
_Options_:
```
-h, --help show this help message and exit
-n NUMBER, --number NUMBER
Affiche les NUMBER derniers messages
-o, --outbox Lit les messages envoyés
```
### Envoi de messages
```
./dialog.py send -d DESTINATAIRE
```
_Options_:
```
-h, --help show this help message and exit
-d DESTINATAIRE, --destinataire DESTINATAIRE
Destinataire du message
-t TITRE, --titre TITRE
Titre du message à envoyer
-m MESSAGE, --message MESSAGE
Message à envoyer
-f FICHIER, --fichier FICHIER
Envoyer le message contenu dans le fichier 'FICHIER'
-o, --outbox Envoi le message sur la boite d'envoi
```
### Suppression de messages
```
./dialog.py delete -i ID
```
_Options_:
```
-h, --help show this help message and exit
-i ID, --id ID ID du message à supprimer
-o, --outbox Suppression d'un message envoyé
```

76
zen/cesium-messaging/dialog.py Executable file
View File

@ -0,0 +1,76 @@
#!/usr/bin/env python3
import argparse, sys, os
from os.path import join, dirname
from shutil import copyfile
from dotenv import load_dotenv
from lib.cesiumMessaging import ReadFromCesium, SendToCesium, DeleteFromCesium, VERSION
# Get variables environment
if not os.path.isfile('.env'):
copyfile(".env.template", ".env")
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
dunikey = os.getenv('DUNIKEY')
pod = os.getenv('POD')
if not dunikey or not pod:
sys.stderr.write("Please fill the path of your private key (PubSec), and a Cesium ES address in .env file\n")
sys.exit(1)
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--version', action='store_true', help="Affiche la version actuelle du programme")
subparsers = parser.add_subparsers()
read_cmd = subparsers.add_parser('read', help="Lecture des messages")
send_cmd = subparsers.add_parser('send', help="Envoi d'un message")
delete_cmd = subparsers.add_parser('delete', help="Supression d'un message")
if len(sys.argv) <= 1 or not sys.argv[1] in ('read','send','delete','-v','--version'):
sys.stderr.write("Veuillez indiquer une commande valide:\n\n")
parser.print_help()
sys.exit(1)
read_cmd.add_argument('-n', '--number',type=int, default=3, help="Affiche les NUMBER derniers messages")
read_cmd.add_argument('-o', '--outbox', action='store_true', help="Lit les messages envoyés")
send_cmd.add_argument('-d', '--destinataire', required=True, help="Destinataire du message")
send_cmd.add_argument('-t', '--titre', help="Titre du message à envoyer")
send_cmd.add_argument('-m', '--message', help="Message à envoyer")
send_cmd.add_argument('-f', '--fichier', help="Envoyer le message contenu dans le fichier 'FICHIER'")
send_cmd.add_argument('-o', '--outbox', action='store_true', help="Envoi le message sur la boite d'envoi")
delete_cmd.add_argument('-i', '--id', action='append', nargs='+', required=True, help="ID(s) du/des message(s) à supprimer")
delete_cmd.add_argument('-o', '--outbox', action='store_true', help="Suppression d'un message envoyé")
args = parser.parse_args()
if args.version:
print(VERSION)
sys.exit(0)
# Build cesiumMessaging class
if sys.argv[1] == "read":
messages = ReadFromCesium(dunikey, pod)
messages.read(args.number, args.outbox)
elif sys.argv[1] == "send":
if args.fichier:
with open(args.fichier, 'r') as f:
titre = f.readline()
msg = ''.join(f.read().splitlines(True)[0:])
elif args.titre and args.message:
titre = args.titre
msg = args.message
else:
titre = input("Indiquez le titre du message: ")
msg = input("Indiquez le contenu du message: ")
messages = SendToCesium(dunikey, pod, args.destinataire, args.outbox)
messages.send(titre, msg)
elif sys.argv[1] == "delete":
messages = DeleteFromCesium(dunikey, pod, args.outbox)
messages.delete(args.id[0])

View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
import argparse, sys, os
from os.path import join, dirname
from shutil import copyfile
from dotenv import load_dotenv
from lib.gchange import ReadLikes, SendLikes, UnLikes
# Get variables environment
if not os.path.isfile('.env'):
copyfile(".env.template", ".env")
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
dunikey = os.getenv('DUNIKEY')
pod = os.getenv('POD')
if not dunikey or not pod:
sys.stderr.write("Please fill the path of your private key (PubSec), and a Cesium ES address in .env file\n")
sys.exit(1)
# Parse arguments
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
# readLike_cmd = subparsers.add_parser('readlike', help="Lire les likes d'un profile")
like_cmd = subparsers.add_parser('like', help="Voir les likes d'un profile / Liker un profile (option -s NOTE")
unlike_cmd = subparsers.add_parser('unlike', help="Supprimer un like")
if len(sys.argv) <= 1 or not sys.argv[1] in ('like','unlike'):
sys.stderr.write("Veuillez indiquer une commande valide:\n\n")
parser.print_help()
sys.exit(1)
# readLike_cmd.add_argument('-p', '--profile', help="Profile cible")
like_cmd.add_argument('-p', '--profile', help="Profile cible")
like_cmd.add_argument('-s', '--stars', type=int, help="Nombre d'étoile")
unlike_cmd.add_argument('-p', '--profile', help="Profile à déliker")
args = parser.parse_args()
# Build gchange class
if sys.argv[1] == "like":
if args.stars or args.stars == 0:
gchange = SendLikes(dunikey, pod)
gchange.like(args.stars, args.profile)
else:
gchange = ReadLikes(dunikey, pod)
gchange.readLikes(args.profile)
elif sys.argv[1] == "unlike":
gchange = UnLikes(dunikey, pod)
gchange.unLike(args.profile)

View File

@ -0,0 +1,298 @@
#!/usr/bin/env python3
import os, sys, ast, requests, json, base58, base64, time, string, random, re
from lib.natools import fmt, sign, get_privkey, box_decrypt, box_encrypt
from hashlib import sha256
from datetime import datetime
from termcolor import colored
VERSION = "0.1.1"
PUBKEY_REGEX = "(?![OIl])[1-9A-Za-z]{42,45}"
def pp_json(json_thing, sort=True, indents=4):
# Print beautifull JSON
if type(json_thing) is str:
print(json.dumps(json.loads(json_thing), sort_keys=sort, indent=indents))
else:
print(json.dumps(json_thing, sort_keys=sort, indent=indents))
return None
class ReadFromCesium:
def __init__(self, dunikey, pod):
# Get my pubkey from my private key
try:
self.dunikey = dunikey
if not dunikey:
raise ValueError("Dunikey is empty")
except:
sys.stderr.write("Please fill the path to your private key (PubSec)\n")
sys.exit(1)
self.recipient = get_privkey(dunikey, "pubsec").pubkey
self.pod = pod
if not re.match(PUBKEY_REGEX, self.recipient) or len(self.recipient) > 45:
sys.stderr.write("La clé publique n'est pas au bon format.\n")
sys.exit(1)
# Configure JSON document to send
def configDoc(self, nbrMsg, outbox):
boxType = "issuer" if outbox else "recipient"
data = {}
data['sort'] = { "time": "desc" }
data['from'] = 0
data['size'] = nbrMsg
data['_source'] = ['issuer','recipient','title','content','time','nonce','read_signature']
data['query'] = {}
data['query']['bool'] = {}
data['query']['bool']['filter'] = {}
data['query']['bool']['filter']['term'] = {}
data['query']['bool']['filter']['term'][boxType] = self.recipient
document = json.dumps(data)
return document
def sendDocument(self, nbrMsg, outbox):
boxType = "outbox" if outbox else "inbox"
document = self.configDoc(nbrMsg, outbox)
headers = {
'Content-type': 'application/json',
}
# Send JSON document and get JSON result
result = requests.post('{0}/message/{1}/_search'.format(self.pod, boxType), headers=headers, data=document)
if result.status_code == 200:
return result.json()["hits"]
else:
sys.stderr.write("Echec de l'envoi du document de lecture des messages...\n" + result.text)
# Parse JSON result and display messages
def readMessages(self, msgJSON, nbrMsg, outbox):
def decrypt(msg):
msg64 = base64.b64decode(msg)
return box_decrypt(msg64, get_privkey(self.dunikey, "pubsec"), self.issuer, nonce).decode()
# Get terminal size
rows = int(os.popen('stty size', 'r').read().split()[1])
totalMsg = msgJSON["total"]
if nbrMsg > totalMsg:
nbrMsg = totalMsg
if totalMsg == 0:
print(colored("Aucun message à afficher.", 'yellow'))
return True
else:
infoTotal = " Nombre de messages: " + str(nbrMsg) + "/" + str(totalMsg) + " "
print(colored(infoTotal.center(rows, '#'), "yellow"))
for hits in msgJSON["hits"]:
self.idMsg = hits["_id"]
msgSrc = hits["_source"]
self.issuer = msgSrc["issuer"]
nonce = msgSrc["nonce"]
nonce = base58.b58decode(nonce)
self.dateS = msgSrc["time"]
date = datetime.fromtimestamp(self.dateS).strftime(", le %d/%m/%Y à %H:%M ")
if outbox:
startHeader = " À " + msgSrc["recipient"]
else:
startHeader = " De " + self.issuer
headerMsg = startHeader + date + "(ID: {})".format(self.idMsg) + " "
print('-'.center(rows, '-'))
print(colored(headerMsg, "blue").center(rows+9, '-'))
print('-'.center(rows, '-'))
try:
self.title = decrypt(msgSrc["title"])
self.content = decrypt(msgSrc["content"])
except Exception as e:
sys.stderr.write(colored(str(e), 'red') + '\n')
pp_json(hits)
continue
print("Objet: " + self.title)
print(self.content)
# pp_json(hits)
def read(self, nbrMsg, outbox):
jsonMsg = self.sendDocument(nbrMsg, outbox)
self.readMessages(jsonMsg, nbrMsg, outbox)
#################### Sending class ####################
class SendToCesium:
def __init__(self, dunikey, pod, recipient, outbox):
# Get my pubkey from my private key
try:
self.dunikey = dunikey
if not dunikey:
raise ValueError("Dunikey is empty")
except:
sys.stderr.write("Please fill the path to your private key (PubSec)\n")
sys.exit(1)
self.issuer = get_privkey(dunikey, "pubsec").pubkey
self.pod = pod
self.recipient = recipient
self.outbox = outbox
# Generate pseudo-random nonce
nonce=[]
for i in range(32):
nonce.append(random.choice(string.ascii_letters + string.digits))
self.nonce = base64.b64decode(''.join(nonce))
if not re.match(PUBKEY_REGEX, recipient) or len(recipient) > 45:
sys.stderr.write("La clé publique n'est pas au bon format.\n")
sys.exit(1)
def encryptMsg(self, msg):
return fmt["64"](box_encrypt(msg.encode(), get_privkey(self.dunikey, "pubsec"), self.recipient, self.nonce)).decode()
def configDoc(self, title, msg):
b58nonce = base58.b58encode(self.nonce).decode()
# Get current timestamp
timeSent = int(time.time())
# Generate custom JSON
data = {}
data['issuer'] = self.issuer
data['recipient'] = self.recipient
data['title'] = title
data['content'] = msg
data['time'] = timeSent
data['nonce'] = b58nonce
data['version'] = 2
document = json.dumps(data)
# Generate hash of document
hashDoc = sha256(document.encode()).hexdigest().upper()
# Generate signature of document
signature = fmt["64"](sign(hashDoc.encode(), get_privkey(self.dunikey, "pubsec"))[:-len(hashDoc.encode())]).decode()
# Build final document
finalDoc = '{' + '"hash":"{0}","signature":"{1}",'.format(hashDoc, signature) + document[1:]
return finalDoc
def sendDocument(self, document):
boxType = "outbox" if self.outbox else "inbox"
headers = {
'Content-type': 'application/json',
}
# Send JSON document and get result
try:
result = requests.post('{0}/message/{1}?pubkey={2}'.format(self.pod, boxType, self.recipient), headers=headers, data=document)
except Exception as e:
sys.stderr.write("Impossible d'envoyer le message:\n" + str(e))
sys.exit(1)
else:
if result.status_code == 200:
print(colored("Message envoyé avec succès !", "green"))
print("ID: " + result.text)
return result
else:
sys.stderr.write("Erreur inconnue:" + '\n')
print(str(pp_json(result.text)) + '\n')
def send(self, title, msg):
finalDoc = self.configDoc(self.encryptMsg(title), self.encryptMsg(msg)) # Configure JSON document to send
self.sendDocument(finalDoc) # Send final signed document
#################### Deleting class ####################
class DeleteFromCesium:
def __init__(self, dunikey, pod, outbox):
# Get my pubkey from my private key
try:
self.dunikey = dunikey
if not dunikey:
raise ValueError("Dunikey is empty")
except:
sys.stderr.write("Please fill the path to your private key (PubSec)\n")
sys.exit(1)
self.issuer = get_privkey(dunikey, "pubsec").pubkey
self.pod = pod
self.outbox = outbox
def configDoc(self, idMsg):
# Get current timestamp
timeSent = int(time.time())
boxType = "outbox" if self.outbox else "inbox"
# Generate document to customize
data = {}
data['version'] = 2
data['index'] = "message"
data['type'] = boxType
data['id'] = idMsg
data['issuer'] = self.issuer
data['time'] = timeSent
document = json.dumps(data)
# Generate hash of document
hashDoc = sha256(document.encode()).hexdigest().upper()
# Generate signature of document
signature = fmt["64"](sign(hashDoc.encode(), get_privkey(self.dunikey, "pubsec"))[:-len(hashDoc.encode())]).decode()
# Build final document
data = {}
data['hash'] = hashDoc
data['signature'] = signature
signJSON = json.dumps(data)
finalJSON = {**json.loads(signJSON), **json.loads(document)}
finalDoc = json.dumps(finalJSON)
return finalDoc
def sendDocument(self, document, idMsg):
headers = {
'Content-type': 'application/json',
}
# Send JSON document and get result
try:
result = requests.post('{0}/history/delete'.format(self.pod), headers=headers, data=document)
if result.status_code == 404:
raise ValueError("Message introuvable")
elif result.status_code == 403:
raise ValueError("Vous n'êtes pas l'auteur de ce message.")
except Exception as e:
sys.stderr.write(colored("Impossible de supprimer le message {0}:\n".format(idMsg), 'red') + str(e) + "\n")
return False
else:
if result.status_code == 200:
print(colored("Message {0} supprimé avec succès !".format(idMsg), "green"))
return result
else:
sys.stderr.write("Erreur inconnue.")
def delete(self, idsMsgList):
for idMsg in idsMsgList:
finalDoc = self.configDoc(idMsg)
self.sendDocument(finalDoc, idMsg)

View File

@ -0,0 +1,324 @@
#!/usr/bin/env python3
import os, sys, ast, requests, json, base58, base64, time, string, random, re
from lib.natools import fmt, sign, get_privkey, box_decrypt, box_encrypt
from time import sleep
from hashlib import sha256
from datetime import datetime
from termcolor import colored
PUBKEY_REGEX = "(?![OIl])[1-9A-Za-z]{42,45}"
class ReadLikes:
def __init__(self, dunikey, pod):
# Get my pubkey from my private key
try:
self.dunikey = dunikey
if not dunikey:
raise ValueError("Dunikey is empty")
except:
sys.stderr.write("Please fill the path to your private key (PubSec)\n")
sys.exit(1)
self.issuer = get_privkey(dunikey, "pubsec").pubkey
self.pod = pod
if not re.match(PUBKEY_REGEX, self.issuer) or len(self.issuer) > 45:
sys.stderr.write("La clé publique n'est pas au bon format.\n")
sys.exit(1)
# Configure JSON document to send
def configDoc(self, profile):
if not profile: profile = self.issuer
# elif len(profile) < 42:
# print(len(profile))
# gProfile = requests.get('{0}/user/profile/{1}'.format(self.pod, issuer))
# gProfile = json.loads(gProfile.text)['_source']
# pseudo = gProfile['title']
data = {}
data['query'] = {}
data['query']['bool'] = {}
data['query']['bool']['filter'] = [
{'term': {'index': 'user'}},
{'term': {'type': 'profile'}},
{'term': {'id': profile}},
{'term': {'kind': 'STAR'}}
]
# data['query']['bool']['should'] = {'term':{'issuer': self.issuer}}
data['size'] = 5000
data['_source'] = ['issuer','level']
data['aggs'] = {
'level_sum': {
'sum': {
'field': 'level'
}
}
}
return json.dumps(data)
def sendDocument(self, document):
headers = {
'Content-type': 'application/json',
}
# Send JSON document and get JSON result
result = requests.post('{0}/like/record/_search'.format(self.pod), headers=headers, data=document)
if result.status_code == 200:
# print(result.text)
return result.text
else:
sys.stderr.write("Echec de l'envoi du document de lecture des messages...\n" + result.text + '\n')
def parseResult(self, result):
result = json.loads(result)
totalLikes = result['hits']['total']
totalValue = result['aggregations']['level_sum']['value']
if totalLikes:
score = totalValue/totalLikes
else:
score = 0
raw = result['hits']['hits']
finalPrint = {}
finalPrint['likes'] = []
for i in raw:
issuer = i['_source']['issuer']
gProfile = self.getProfile(issuer)
pseudo = gProfile['title']
payTo = gProfile['pubkey']
id = i['_id']
level = i['_source']['level']
if issuer == self.issuer:
finalPrint['yours'] = { 'id' : id, 'pseudo' : pseudo, 'level' : level }
else:
finalPrint['likes'].append({ 'issuer' : issuer, 'pseudo' : pseudo, 'payTo' : payTo, 'level' : level })
finalPrint['score'] = score
return json.dumps(finalPrint)
def getProfile(self, profile):
headers = {
'Content-type': 'application/json',
}
data = {}
data['query'] = {}
data['query']['bool'] = {}
data['query']['bool']['filter'] = [
{'term': {'_index': 'user'}},
{'term': {'_type': 'profile'}},
{'term': {'_id': profile}}
]
data['_source'] = ['title','pubkey']
data = json.dumps(data)
result = requests.post('{0}/user/profile/_search'.format(self.pod), headers=headers, data=data)
result = json.loads(result.text)['hits']['hits'][0]['_source']
return result
def readLikes(self, profile=False):
document = self.configDoc(profile)
result = self.sendDocument(document)
result = self.parseResult(result)
print(result)
return result
#################### Like class ####################
class SendLikes:
def __init__(self, dunikey, pod):
# Get my pubkey from my private key
try:
self.dunikey = dunikey
if not dunikey:
raise ValueError("Dunikey is empty")
except:
sys.stderr.write("Please fill the path to your private key (PubSec)\n")
sys.exit(1)
self.issuer = get_privkey(dunikey, "pubsec").pubkey
self.pod = pod
if not re.match(PUBKEY_REGEX, self.issuer) or len(self.issuer) > 45:
sys.stderr.write("La clé publique n'est pas au bon format.\n")
sys.exit(1)
# Configure JSON document to send
def configDoc(self, profile, likes):
if not profile: profile = self.issuer
if likes not in range(0, 6):
sys.stderr.write(colored('Votre like doit être compris entre 0 et 5.\n', 'red'))
return False
timeSent = int(time.time())
data = {}
data['version'] = 2
data['index'] = "user"
data['type'] = "profile"
data['id'] = profile
data['kind'] = "STAR"
data['level'] = likes
data['time'] = timeSent
data['issuer'] = self.issuer
document = json.dumps(data)
# Generate hash of document
hashDoc = sha256(document.encode()).hexdigest().upper()
# Generate signature of document
signature = fmt["64"](sign(hashDoc.encode(), get_privkey(self.dunikey, "pubsec"))[:-len(hashDoc.encode())]).decode()
# Build final document
data = {}
data['hash'] = hashDoc
data['signature'] = signature
signJSON = json.dumps(data)
finalJSON = {**json.loads(signJSON), **json.loads(document)}
finalDoc = json.dumps(finalJSON)
return finalDoc
def sendDocument(self, document, pubkey):
headers = {
'Content-type': 'application/json',
}
# Send JSON document and get JSON result
result = requests.post('{0}/user/profile/:id/_like'.format(self.pod), headers=headers, data=document)
if result.status_code == 200:
print(colored("Profile liké avec succès !", 'green'))
return result.text
elif result.status_code == 400:
resultJson = json.loads(result.text)
if 'DuplicatedDocumentException' in resultJson['error']:
rmLike = UnLikes(self.dunikey, self.pod)
rmLike.unLike(pubkey, True)
sleep(0.5)
self.sendDocument(document, pubkey)
return resultJson['error']
else:
sys.stderr.write("Echec de l'envoi du document de lecture des messages...\n" + resultJson['error'] + '\n')
else:
resultJson = json.loads(result.text)
sys.stderr.write("Echec de l'envoi du document de lecture des messages...\n" + resultJson['error'] + '\n')
def like(self, stars, profile=False):
document = self.configDoc(profile, stars)
if document:
self.sendDocument(document, profile)
#################### Unlike class ####################
class UnLikes:
def __init__(self, dunikey, pod):
# Get my pubkey from my private key
try:
self.dunikey = dunikey
if not dunikey:
raise ValueError("Dunikey is empty")
except:
sys.stderr.write("Please fill the path to your private key (PubSec)\n")
sys.exit(1)
self.issuer = get_privkey(dunikey, "pubsec").pubkey
self.pod = pod
if not re.match(PUBKEY_REGEX, self.issuer) or len(self.issuer) > 45:
sys.stderr.write("La clé publique n'est pas au bon format.\n")
sys.exit(1)
# Check if you liked this profile
def checkLike(self, pubkey):
readProfileLikes = ReadLikes(self.dunikey, self.pod)
document = readProfileLikes.configDoc(pubkey)
result = readProfileLikes.sendDocument(document)
result = readProfileLikes.parseResult(result)
result = json.loads(result)
if 'yours' in result:
myLike = result['yours']['id']
return myLike
else:
sys.stderr.write("Vous n'avez pas liké ce profile\n")
return False
# Configure JSON document to send
def configDoc(self, idLike):
timeSent = int(time.time())
data = {}
data['version'] = 2
data['index'] = "like"
data['type'] = "record"
data['id'] = idLike
data['issuer'] = self.issuer
data['time'] = timeSent
document = json.dumps(data)
# Generate hash of document
hashDoc = sha256(document.encode()).hexdigest().upper()
# Generate signature of document
signature = fmt["64"](sign(hashDoc.encode(), get_privkey(self.dunikey, "pubsec"))[:-len(hashDoc.encode())]).decode()
# Build final document
data = {}
data['hash'] = hashDoc
data['signature'] = signature
signJSON = json.dumps(data)
finalJSON = {**json.loads(signJSON), **json.loads(document)}
finalDoc = json.dumps(finalJSON)
return finalDoc
def sendDocument(self, document, silent):
headers = {
'Content-type': 'application/json',
}
# Send JSON document and get JSON result
result = requests.post('{0}/history/delete'.format(self.pod), headers=headers, data=document)
if result.status_code == 200:
if not silent:
print(colored("Like supprimé avec succès !", 'green'))
return result.text
else:
sys.stderr.write("Echec de l'envoi du document de lecture des messages...\n" + result.text + '\n')
def unLike(self, pubkey, silent=False):
idLike = self.checkLike(pubkey)
if idLike:
document = self.configDoc(idLike)
self.sendDocument(document, silent)

View File

@ -0,0 +1,297 @@
#!/usr/bin/env python3
"""
CopyLeft 2020 Pascal Engélibert <tuxmain@zettascript.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
__version__ = "1.3.1"
import os, sys, duniterpy.key, libnacl, base58, base64, getpass
def getargv(arg:str, default:str="", n:int=1, args:list=sys.argv) -> str:
if arg in args and len(args) > args.index(arg)+n:
return args[args.index(arg)+n]
else:
return default
def read_data(data_path, b=True):
if data_path == "-":
if b:
return sys.stdin.buffer.read()
else:
return sys.stdin.read()
else:
return open(os.path.expanduser(data_path), "rb" if b else "r").read()
def write_data(data, result_path):
if result_path == "-":
os.fdopen(sys.stdout.fileno(), 'wb').write(data)
else:
open(os.path.expanduser(result_path), "wb").write(data)
def encrypt(data, pubkey):
return duniterpy.key.PublicKey(pubkey).encrypt_seal(data)
def decrypt(data, privkey):
return privkey.decrypt_seal(data)
def box_encrypt(data, privkey, pubkey, nonce=None, attach_nonce=False):
signer = libnacl.sign.Signer(privkey.seed)
sk = libnacl.public.SecretKey(libnacl.crypto_sign_ed25519_sk_to_curve25519(signer.sk))
verifier = libnacl.sign.Verifier(base58.b58decode(pubkey).hex())
pk = libnacl.public.PublicKey(libnacl.crypto_sign_ed25519_pk_to_curve25519(verifier.vk))
box = libnacl.public.Box(sk.sk, pk.pk)
data = box.encrypt(data, nonce) if nonce else box.encrypt(data)
return data if attach_nonce else data[24:]
def box_decrypt(data, privkey, pubkey, nonce=None):
signer = libnacl.sign.Signer(privkey.seed)
sk = libnacl.public.SecretKey(libnacl.crypto_sign_ed25519_sk_to_curve25519(signer.sk))
verifier = libnacl.sign.Verifier(base58.b58decode(pubkey).hex())
pk = libnacl.public.PublicKey(libnacl.crypto_sign_ed25519_pk_to_curve25519(verifier.vk))
box = libnacl.public.Box(sk.sk, pk.pk)
return box.decrypt(data, nonce) if nonce else box.decrypt(data)
def sign(data, privkey):
return privkey.sign(data)
def verify(data, pubkey):
try:
ret = libnacl.sign.Verifier(duniterpy.key.PublicKey(pubkey).hex_pk()).verify(data)
sys.stderr.write("Signature OK!\n")
return ret
except ValueError:
sys.stderr.write("Bad signature!\n")
exit(1)
def get_privkey(privkey_path, privkey_format):
if privkey_format == "pubsec":
if privkey_path == "*":
privkey_path = "privkey.pubsec"
return duniterpy.key.SigningKey.from_pubsec_file(privkey_path)
elif privkey_format == "cred":
if privkey_path == "*":
privkey_path = "-"
if privkey_path == "-":
return duniterpy.key.SigningKey.from_credentials(getpass.getpass("Password: "), getpass.getpass("Salt: "))
else:
return duniterpy.key.SigningKey.from_credentials_file(privkey_path)
elif privkey_format == "seedh":
if privkey_path == "*":
privkey_path = "authfile.seedhex"
return duniterpy.key.SigningKey.from_seedhex(read_data(privkey_path, False))
elif privkey_format == "wif":
if privkey_path == "*":
privkey_path = "authfile.wif"
return duniterpy.key.SigningKey.from_wif_or_ewif_file(privkey_path)
elif privkey_format == "wifh":
if privkey_path == "*":
privkey_path = "authfile.wif"
return duniterpy.key.SigningKey.from_wif_or_ewif_hex(privkey_path)
elif privkey_format == "ssb":
if privkey_path == "*":
privkey_path = "secret"
return duniterpy.key.SigningKey.from_ssb_file(privkey_path)
elif privkey_format == "key":
if privkey_path == "*":
privkey_path = "authfile.key"
return duniterpy.key.SigningKey.from_private_key(privkey_path)
print("Error: unknown privkey format")
def fill_pubkey(pubkey, length=32):
while pubkey[0] == 0:
pubkey = pubkey[1:]
return b"\x00"*(length-len(pubkey)) + pubkey
def pubkey_checksum(pubkey, length=32, clength=3):
return base58.b58encode(libnacl.crypto_hash_sha256(libnacl.crypto_hash_sha256(fill_pubkey(base58.b58decode(pubkey), length)))).decode()[:clength]
# returns (pubkey:bytes|None, deprecated_length:bool)
def check_pubkey(pubkey):
if ":" in pubkey:
parts = pubkey.split(":")
if len(parts[1]) < 3 or len(parts[1]) > 32:
return (None, False)
for i in range(32, 0, -1):
if pubkey_checksum(parts[0], i, len(parts[1])) == parts[1]:
return (parts[0], i < 32)
return (None, False)
return (pubkey, False)
fmt = {
"raw": lambda data: data,
"16": lambda data: data.hex().encode(),
"32": lambda data: base64.b32encode(data),
"58": lambda data: base58.b58encode(data),
"64": lambda data: base64.b64encode(data),
"64u": lambda data: base64.urlsafe_b64encode(data),
"85": lambda data: base64.b85encode(data),
}
defmt = {
"raw": lambda data: data,
"16": lambda data: bytes.fromhex(data),
"32": lambda data: base64.b32decode(data),
"58": lambda data: base58.b58decode(data),
"64": lambda data: base64.b64decode(data),
"85": lambda data: base64.b85decode(data),
}
def show_help():
print("""Usage:
python3 natools.py <command> [options]
Commands:
encrypt Encrypt data
decrypt Decrypt data
box-encrypt Encrypt data (NaCl box)
box-decrypt Decrypt data (NaCl box)
sign Sign data
verify Verify data
pubkey Display pubkey
pk Display b58 pubkey shorthand
Options:
-c Display pubkey checksum
-f <fmt> Private key format (default: cred)
key cred pubsec seedh ssb wif wifh
-i <path> Input file path (default: -)
-I <fmt> Input format: raw 16 32 58 64 85 (default: raw)
-k <path> Privkey file path (* for auto) (default: *)
-n <nonce> Nonce (b64, 24 bytes) (for NaCl box)
-N Attach nonce to output (for NaCl box encryption)
--noinc Do not include msg after signature
-o <path> Output file path (default: -)
-O <fmt> Output format: raw 16 32 58 64 64u 85 (default: raw)
-p <str> Pubkey (base58)
--help Show help
--version Show version
--debug Debug mode (display full errors)
Note: "-" means stdin or stdout.
""")
if __name__ == "__main__":
if "--help" in sys.argv:
show_help()
exit()
if "--version" in sys.argv:
print(__version__)
exit()
privkey_format = getargv("-f", "cred")
data_path = getargv("-i", "-")
privkey_path = getargv("-k", "*")
pubkey = getargv("-p")
result_path = getargv("-o", "-")
output_format = getargv("-O", "raw")
input_format = getargv("-I", "raw")
if pubkey:
pubkey, len_deprecated = check_pubkey(pubkey)
if not pubkey:
print("Invalid pubkey checksum! Please check spelling.")
exit(1)
if len(base58.b58decode(pubkey)) > 32:
print("Invalid pubkey: too long!")
exit(1)
if len_deprecated:
print("Warning: valid pubkey checksum, but deprecated format (truncating zeros)")
try:
if sys.argv[1] == "encrypt":
if not pubkey:
print("Please provide pubkey!")
exit(1)
write_data(fmt[output_format](encrypt(defmt[input_format](read_data(data_path)), pubkey)), result_path)
elif sys.argv[1] == "decrypt":
write_data(fmt[output_format](decrypt(defmt[input_format](read_data(data_path)), get_privkey(privkey_path, privkey_format))), result_path)
elif sys.argv[1] == "box-encrypt":
if not pubkey:
print("Please provide pubkey!")
exit(1)
nonce = getargv("-n", None)
if nonce:
nonce = base64.b64decode(nonce)
attach_nonce = "-N" in sys.argv
write_data(fmt[output_format](box_encrypt(defmt[input_format](read_data(data_path)), get_privkey(privkey_path, privkey_format), pubkey, nonce, attach_nonce)), result_path)
elif sys.argv[1] == "box-decrypt":
if not pubkey:
print("Please provide pubkey!")
exit(1)
nonce = getargv("-n", None)
if nonce:
nonce = base64.b64decode(nonce)
write_data(fmt[output_format](box_decrypt(defmt[input_format](read_data(data_path)), get_privkey(privkey_path, privkey_format), pubkey, nonce)), result_path)
elif sys.argv[1] == "sign":
data = defmt[input_format](read_data(data_path))
signed = sign(data, get_privkey(privkey_path, privkey_format))
if "--noinc" in sys.argv:
signed = signed[:len(signed)-len(data)]
write_data(fmt[output_format](signed), result_path)
elif sys.argv[1] == "verify":
if not pubkey:
print("Please provide pubkey!")
exit(1)
write_data(fmt[output_format](verify(defmt[input_format](read_data(data_path)), pubkey)), result_path)
elif sys.argv[1] == "pubkey":
if pubkey:
if "-c" in sys.argv and output_format == "58":
write_data("{}:{}".format(pubkey, pubkey_checksum(pubkey)).encode(), result_path)
else:
write_data(fmt[output_format](base58.b58decode(pubkey)), result_path)
else:
pubkey = get_privkey(privkey_path, privkey_format).pubkey
if "-c" in sys.argv and output_format == "58":
write_data("{}:{}".format(pubkey, pubkey_checksum(pubkey)).encode(), result_path)
else:
write_data(fmt[output_format](base58.b58decode(pubkey)), result_path)
elif sys.argv[1] == "pk":
if not pubkey:
pubkey = get_privkey(privkey_path, privkey_format).pubkey
if "-c" in sys.argv:
print("{}:{}".format(pubkey, pubkey_checksum(pubkey)))
else:
print(pubkey)
else:
show_help()
except Exception as e:
if "--debug" in sys.argv:
0/0 # DEBUG MODE (raise error when handling error to display backtrace)
sys.stderr.write("Error: {}\n".format(e))
show_help()
exit(1)

View File

@ -0,0 +1,6 @@
wheel
base58
pybase64
duniterpy
termcolor
python-dotenv

12
zen/cesium-messaging/setup.sh Executable file
View File

@ -0,0 +1,12 @@
#!/bin/bash
for i in gcc python3-pip python3-setuptools libpq-dev python3-dev python3-wheel; do
if [ $(dpkg-query -W -f='${Status}' $i 2>/dev/null | grep -c "ok installed") -eq 0 ]; then
[[ ! $j ]] && sudo apt update
sudo apt install -y $i
j=1
fi
done
pip3 install -r requirements.txt
chmod u+x dialog.py

View File

@ -0,0 +1,3 @@
DUNIKEY²="/home/fred/.ssb/trousseau-2L8vaYix-Fred-gchange-PubSec.dunikey" # Chemin du fichier de trousseau Ḡ1 de l'émetteur, au format PubSec
POD="https://g1.data.duniter.fr" # Noeud Cecium+ utilisé pour l'envoi du message
#pod="https://data.gchange.fr" # Noeud Gchange utilisé pour l'envoi du message