commit 401c5ed1bf14e71a8223d8d32d690e65c43f6155
parent f353930f449611fd31cf9ad507cfb91548f45232
Author: nolash <dev@holbrook.no>
Date: Sun, 10 Oct 2021 18:11:35 +0200
Factor out eth code
Diffstat:
31 files changed, 1 insertion(+), 1809 deletions(-)
diff --git a/config/config.ini b/config/config.ini
@@ -1,3 +0,0 @@
-[signer]
-secret = deadbeef
-socket_path = ipc:///tmp/crypto-dev-signer/jsonrpc.ipc
diff --git a/config/database.ini b/config/database.ini
@@ -1,6 +0,0 @@
-[database]
-NAME=cic-signer
-USER=postgres
-PASSWORD=
-HOST=localhost
-PORT=5432
diff --git a/funga/eth/__init__.py b/funga/eth/__init__.py
diff --git a/funga/eth/cli/__init__.py b/funga/eth/cli/__init__.py
diff --git a/funga/eth/cli/handle.py b/funga/eth/cli/handle.py
@@ -1,115 +0,0 @@
-# standard imports
-import json
-import logging
-
-# external imports
-from jsonrpc.exceptions import (
- JSONRPCServerError,
- JSONRPCParseError,
- JSONRPCInvalidParams,
- )
-from hexathon import add_0x
-
-# local imports
-from funga.eth.transaction import EIP155Transaction
-from funga.error import (
- UnknownAccountError,
- SignerError,
- )
-from funga.eth.cli.jsonrpc import jsonrpc_ok
-from .jsonrpc import (
- jsonrpc_error,
- is_valid_json,
- )
-
-logg = logging.getLogger(__name__)
-
-
-class SignRequestHandler:
-
- keystore = None
- signer = None
-
- def process_input(self, j):
- rpc_id = j['id']
- m = j['method']
- p = j['params']
- return (rpc_id, getattr(self, m)(p))
-
-
- def handle_jsonrpc(self, d):
- j = None
- try:
- j = json.loads(d)
- is_valid_json(j)
- logg.debug('{}'.format(d.decode('utf-8')))
- except Exception as e:
- logg.exception('input error {}'.format(e))
- j = json.dumps(jsonrpc_error(None, JSONRPCParseError)).encode('utf-8')
- raise SignerError(j)
-
- try:
- (rpc_id, r) = self.process_input(j)
- r = jsonrpc_ok(rpc_id, r)
- j = json.dumps(r).encode('utf-8')
- except ValueError as e:
- # TODO: handle cases to give better error context to caller
- logg.exception('process error {}'.format(e))
- j = json.dumps(jsonrpc_error(j['id'], JSONRPCServerError)).encode('utf-8')
- raise SignerError(j)
- except UnknownAccountError as e:
- logg.exception('process unknown account error {}'.format(e))
- j = json.dumps(jsonrpc_error(j['id'], JSONRPCServerError)).encode('utf-8')
- raise SignerError(j)
-
- return j
-
-
- def personal_newAccount(self, p):
- password = p
- if p.__class__.__name__ != 'str':
- if p.__class__.__name__ != 'list':
- e = JSONRPCInvalidParams()
- e.data = 'parameter must be list containing one string'
- raise ValueError(e)
- logg.error('foo {}'.format(p))
- if len(p) != 1:
- e = JSONRPCInvalidParams()
- e.data = 'parameter must be list containing one string'
- raise ValueError(e)
- if p[0].__class__.__name__ != 'str':
- e = JSONRPCInvalidParams()
- e.data = 'parameter must be list containing one string'
- raise ValueError(e)
- password = p[0]
-
- r = self.keystore.new(password)
-
- return add_0x(r)
-
-
- # TODO: move to translation module ("personal" rpc namespace is node-specific)
- def personal_signTransaction(self, p):
- logg.debug('got {} to sign'.format(p[0]))
- t = EIP155Transaction(p[0], p[0]['nonce'], p[0]['chainId'])
- raw_signed_tx = self.signer.sign_transaction_to_rlp(t, p[1])
- o = {
- 'raw': '0x' + raw_signed_tx.hex(),
- 'tx': t.serialize(),
- }
- return o
-
-
- def eth_signTransaction(self, tx):
- o = self.personal_signTransaction([tx[0], ''])
- return o['raw']
-
-
- def eth_sign(self, p):
- logg.debug('got message {} to sign'.format(p[1]))
- message_type = type(p[1]).__name__
- if message_type != 'str':
- raise ValueError('invalid message format, must be {}, not {}'.format(message_type))
- z = self.signer.sign_ethereum_message(p[0], p[1][2:])
- return add_0x(z.hex())
-
diff --git a/funga/eth/cli/http.py b/funga/eth/cli/http.py
@@ -1,85 +0,0 @@
-# standard imports
-import logging
-
-# external imports
-from http.server import (
- HTTPServer,
- BaseHTTPRequestHandler,
- )
-
-# local imports
-from .handle import SignRequestHandler
-from crypto_dev_signer.error import SignerError
-
-logg = logging.getLogger(__name__)
-
-
-def start_server_http(spec):
- httpd = HTTPServer(spec, HTTPSignRequestHandler)
- logg.debug('starting http server {}'.format(spec))
- httpd.serve_forever()
-
-
-class HTTPSignRequestHandler(SignRequestHandler, BaseHTTPRequestHandler):
-
- def do_POST(self):
- if self.headers.get('Content-Type') != 'application/json':
- self.send_response(400, 'me read json only')
- self.end_headers()
- return
-
- try:
- if 'application/json' not in self.headers.get('Accept').split(','):
- self.send_response(400, 'me json only speak')
- self.end_headers()
- return
- except AttributeError:
- pass
-
- l = self.headers.get('Content-Length')
- try:
- l = int(l)
- except ValueError:
- self.send_response(400, 'content length must be integer')
- self.end_headers()
- return
- if l > 4096:
- self.send_response(400, 'too much information')
- self.end_headers()
- return
- if l < 0:
- self.send_response(400, 'you are too negative')
- self.end_headers()
- return
-
- b = b''
- c = 0
- while c < l:
- d = self.rfile.read(l-c)
- if d == None:
- break
- b += d
- c += len(d)
- if c > 4096:
- self.send_response(413, 'i should slap you around for lying about your size')
- self.end_headers()
- return
-
- try:
- r = self.handle_jsonrpc(d)
- except SignerError as e:
- r = e.to_jsonrpc()
-
- l = len(r)
- self.send_response(200, 'You are the Keymaster')
- self.send_header('Content-Length', str(l))
- self.send_header('Cache-Control', 'no-cache')
- self.send_header('Content-Type', 'application/json')
- self.end_headers()
-
- c = 0
- while c < l:
- n = self.wfile.write(r[c:])
- c += n
-
-
diff --git a/funga/eth/cli/jsonrpc.py b/funga/eth/cli/jsonrpc.py
@@ -1,30 +0,0 @@
-# local imports
-from funga.error import UnknownAccountError
-
-
-def jsonrpc_error(rpc_id, err):
- return {
- 'jsonrpc': '2.0',
- 'id': rpc_id,
- 'error': {
- 'code': err.CODE,
- 'message': err.MESSAGE,
- },
- }
-
-
-def jsonrpc_ok(rpc_id, response):
- return {
- 'jsonrpc': '2.0',
- 'id': rpc_id,
- 'result': response,
- }
-
-
-def is_valid_json(j):
- if j.get('id') == 'None':
- raise ValueError('id missing')
- return True
-
-
-
diff --git a/funga/eth/cli/socket.py b/funga/eth/cli/socket.py
@@ -1,67 +0,0 @@
-# standard imports
-import os
-import logging
-import socket
-import stat
-
-# local imports
-from crypto_dev_signer.error import SignerError
-from .handle import SignRequestHandler
-
-logg = logging.getLogger(__name__)
-
-
-class SocketHandler:
-
- def __init__(self):
- self.handler = SignRequestHandler()
-
-
- def process(self, csock):
- d = csock.recv(4096)
-
- r = None
- try:
- r = self.handler.handle_jsonrpc(d)
- except SignerError as e:
- r = e.to_jsonrpc()
-
- csock.send(r)
-
-
-def start_server_socket(s):
- s.listen(10)
- logg.debug('server started')
- handler = SocketHandler()
- while True:
- (csock, caddr) = s.accept()
- handler.process(csock)
- csock.close()
- s.close()
- os.unlink(socket_path)
-
-
-def start_server_tcp(spec):
- s = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
- s.bind(spec)
- logg.debug('created tcp socket {}'.format(spec))
- start_server_socket(s)
-
-
-def start_server_unix(socket_path):
- socket_dir = os.path.dirname(socket_path)
- try:
- fi = os.stat(socket_dir)
- if not stat.S_ISDIR:
- RuntimeError('socket path {} is not a directory'.format(socket_dir))
- except FileNotFoundError:
- os.mkdir(socket_dir)
-
- try:
- os.unlink(socket_path)
- except FileNotFoundError:
- pass
- s = socket.socket(family = socket.AF_UNIX, type = socket.SOCK_STREAM)
- s.bind(socket_path)
- logg.debug('created unix ipc socket {}'.format(socket_path))
- start_server_socket(s)
diff --git a/funga/eth/encoding.py b/funga/eth/encoding.py
@@ -1,91 +0,0 @@
-# standard imports
-import logging
-
-# external imports
-import coincurve
-import sha3
-from hexathon import (
- strip_0x,
- uniform,
- )
-
-logg = logging.getLogger(__name__)
-
-
-def private_key_from_bytes(b):
- return coincurve.PrivateKey(secret=b)
-
-
-def public_key_bytes_to_address(pubk_bytes, result_format='hex'):
- h = sha3.keccak_256()
- logg.debug('public key bytes {}'.format(pubk_bytes.hex()))
- h.update(pubk_bytes[1:])
- z = h.digest()[12:]
- if result_format == 'hex':
- return to_checksum_address(z[:20].hex())
- elif result_format == 'bytes':
- return z[:20]
- raise ValueError('invalid result format "{}"'.format(result_format))
-
-
-def public_key_to_address(pubk, result_format='hex'):
- pubk_bytes = pubk.format(compressed=False)
- return public_key_bytes_to_address(pubk_bytes, result_format='hex')
-
-
-def private_key_to_address(pk, result_format='hex'):
- pubk = coincurve.PublicKey.from_secret(pk.secret)
- #logg.debug('secret {} '.format(pk.secret.hex()))
- return public_key_to_address(pubk, result_format)
-
-
-def is_address(address_hex):
- try:
- address_hex = strip_0x(address_hex)
- except ValueError:
- return False
- return len(address_hex) == 40
-
-
-def is_checksum_address(address_hex):
- hx = None
- try:
- hx = to_checksum(address_hex)
- except ValueError:
- return False
- return hx == strip_0x(address_hex)
-
-
-def to_checksum_address(address_hex):
- address_hex = strip_0x(address_hex)
- address_hex = uniform(address_hex)
- if len(address_hex) != 40:
- raise ValueError('Invalid address length')
- h = sha3.keccak_256()
- h.update(address_hex.encode('utf-8'))
- z = h.digest()
-
- #checksum_address_hex = '0x'
- checksum_address_hex = ''
-
- for (i, c) in enumerate(address_hex):
- if c in '1234567890':
- checksum_address_hex += c
- elif c in 'abcdef':
- if z[int(i / 2)] & (0x80 >> ((i % 2) * 4)) > 1:
- checksum_address_hex += c.upper()
- else:
- checksum_address_hex += c
-
- return checksum_address_hex
-
-to_checksum = to_checksum_address
-
-ethereum_recid_modifier = 35
-
-def chain_id_to_v(chain_id, signature):
- v = signature[64]
- return (chain_id * 2) + ethereum_recid_modifier + v
-
-def chainv_to_v(chain_id, v):
- return v - ethereum_recid_modifier - (chain_id * 2)
diff --git a/funga/eth/helper/__init__.py b/funga/eth/helper/__init__.py
@@ -1 +0,0 @@
-from .tx import EthTxExecutor
diff --git a/funga/eth/helper/tx.py b/funga/eth/helper/tx.py
@@ -1,58 +0,0 @@
-# standard imports
-import logging
-
-# local imports
-from crypto_dev_signer.helper import TxExecutor
-from crypto_dev_signer.error import NetworkError
-
-logg = logging.getLogger()
-logging.getLogger('web3').setLevel(logging.CRITICAL)
-logging.getLogger('urllib3').setLevel(logging.CRITICAL)
-
-
-class EthTxExecutor(TxExecutor):
-
- def __init__(self, w3, sender, signer, chain_id, verifier=None, block=False):
- self.w3 = w3
- nonce = self.w3.eth.getTransactionCount(sender, 'pending')
- super(EthTxExecutor, self).__init__(sender, signer, self.translator, self.dispatcher, self.reporter, nonce, chain_id, self.fee_helper, self.fee_price_helper, verifier, block)
-
-
- def fee_helper(self, tx):
- estimate = self.w3.eth.estimateGas(tx)
- if estimate < 21000:
- estimate = 21000
- logg.debug('estimate {} {}'.format(tx, estimate))
- return estimate
-
-
- def fee_price_helper(self):
- return self.w3.eth.gasPrice
-
-
- def dispatcher(self, tx):
- error_object = None
- try:
- tx_hash = self.w3.eth.sendRawTransaction(tx)
- except ValueError as e:
- error_object = e.args[0]
- logg.error('node could not intepret rlp {}'.format(tx))
- if error_object != None:
- raise NetworkError(error_object)
- return tx_hash
-
-
- def reporter(self, tx):
- return self.w3.eth.getTransactionReceipt(tx)
-
-
- def translator(self, tx):
- if tx.get('feePrice') != None:
- tx['gasPrice'] = tx['feePrice']
- del tx['feePrice']
-
- if tx.get('feeUnits') != None:
- tx['gas'] = tx['feeUnits']
- del tx['feeUnits']
-
- return tx
diff --git a/funga/eth/keystore/__init__.py b/funga/eth/keystore/__init__.py
@@ -1,8 +0,0 @@
-# third-party imports
-#from eth_keys import KeyAPI
-#from eth_keys.backends import NativeECCBackend
-
-#keyapi = KeyAPI(NativeECCBackend)
-
-#from .postgres import ReferenceKeystore
-#from .dict import DictKeystore
diff --git a/funga/eth/keystore/dict.py b/funga/eth/keystore/dict.py
@@ -1,45 +0,0 @@
-# standard imports
-import logging
-
-# external imports
-from hexathon import (
- strip_0x,
- add_0x,
- )
-
-# local imports
-#from . import keyapi
-from funga.error import UnknownAccountError
-from .interface import EthKeystore
-from funga.eth.encoding import private_key_to_address
-
-logg = logging.getLogger(__name__)
-
-
-class DictKeystore(EthKeystore):
-
- def __init__(self):
- super(DictKeystore, self).__init__()
- self.keys = {}
-
-
- def get(self, address, password=None):
- address_key = strip_0x(address).lower()
- if password != None:
- logg.debug('password ignored as dictkeystore doesnt do encryption')
- try:
- return self.keys[address_key]
- except KeyError:
- raise UnknownAccountError(address_key)
-
-
- def list(self):
- return list(self.keys.keys())
-
-
- def import_key(self, pk, password=None):
- address_hex = private_key_to_address(pk)
- address_hex_clean = strip_0x(address_hex).lower()
- self.keys[address_hex_clean] = pk.secret
- logg.debug('added key {}'.format(address_hex))
- return add_0x(address_hex)
diff --git a/funga/eth/keystore/interface.py b/funga/eth/keystore/interface.py
@@ -1,50 +0,0 @@
-# standard imports
-import os
-import json
-import logging
-
-# local imports
-from funga.eth.keystore import keyfile
-from funga.eth.encoding import private_key_from_bytes
-from funga.keystore import Keystore
-
-logg = logging.getLogger(__name__)
-
-
-def native_keygen(*args, **kwargs):
- return os.urandom(32)
-
-
-class EthKeystore(Keystore):
-
- def __init__(self, private_key_generator=native_keygen):
- super(EthKeystore, self).__init__(private_key_generator, private_key_from_bytes, keyfile.from_some)
-
-
- def new(self, password=None):
- b = self.private_key_generator()
- return self.import_raw_key(b, password=password)
-
-
- def import_raw_key(self, b, password=None):
- pk = private_key_from_bytes(b)
- return self.import_key(pk, password)
-
-
- def import_key(self, pk, password=None):
- raise NotImplementedError
-
-
- def import_keystore_data(self, keystore_content, password=''):
- if type(keystore_content).__name__ == 'str':
- keystore_content = json.loads(keystore_content)
- elif type(keystore_content).__name__ == 'bytes':
- logg.debug('bytes {}'.format(keystore_content))
- keystore_content = json.loads(keystore_content.decode('utf-8'))
- private_key = keyfile.from_dict(keystore_content, password.encode('utf-8'))
- return self.import_raw_key(private_key, password)
-
-
- def import_keystore_file(self, keystore_file, password=''):
- private_key = keyfile.from_file(keystore_file, password)
- return self.import_raw_key(private_key)
diff --git a/funga/eth/keystore/keyfile.py b/funga/eth/keystore/keyfile.py
@@ -1,173 +0,0 @@
-# standard imports
-import os
-import hashlib
-import logging
-import json
-import uuid
-
-# external imports
-import coincurve
-from Crypto.Cipher import AES
-from Crypto.Util import Counter
-import sha3
-
-# local imports
-from funga.error import (
- DecryptError,
- KeyfileError,
- )
-from funga.eth.encoding import private_key_to_address
-
-logg = logging.getLogger(__name__)
-
-algo_keywords = [
- 'aes-128-ctr',
- ]
-hash_keywords = [
- 'scrypt'
- ]
-
-default_kdfparams = {
- 'dklen': 32,
- 'n': 1 << 18,
- 'p': 1,
- 'r': 8,
- 'salt': os.urandom(32).hex(),
- }
-
-
-def to_mac(mac_key, ciphertext_bytes):
- h = sha3.keccak_256()
- h.update(mac_key)
- h.update(ciphertext_bytes)
- return h.digest()
-
-
-class Hashes:
-
- @staticmethod
- def from_scrypt(kdfparams=default_kdfparams, passphrase=''):
- dklen = int(kdfparams['dklen'])
- n = int(kdfparams['n'])
- p = int(kdfparams['p'])
- r = int(kdfparams['r'])
- salt = bytes.fromhex(kdfparams['salt'])
-
- return hashlib.scrypt(passphrase.encode('utf-8'), salt=salt,n=n, p=p, r=r, maxmem=1024*1024*1024, dklen=dklen)
-
-
-class Ciphers:
-
- aes_128_block_size = 1 << 7
- aes_iv_len = 16
-
- @staticmethod
- def decrypt_aes_128_ctr(ciphertext, decryption_key, iv):
- ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv)
- cipher = AES.new(decryption_key, AES.MODE_CTR, counter=ctr)
- plaintext = cipher.decrypt(ciphertext)
- return plaintext
-
-
- @staticmethod
- def encrypt_aes_128_ctr(plaintext, encryption_key, iv):
- ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv)
- cipher = AES.new(encryption_key, AES.MODE_CTR, counter=ctr)
- ciphertext = cipher.encrypt(plaintext)
- return ciphertext
-
-
-def to_dict(private_key_bytes, passphrase=''):
-
- private_key = coincurve.PrivateKey(secret=private_key_bytes)
-
- encryption_key = Hashes.from_scrypt(passphrase=passphrase)
-
- address_hex = private_key_to_address(private_key)
- iv_bytes = os.urandom(Ciphers.aes_iv_len)
- iv = int.from_bytes(iv_bytes, 'big')
- ciphertext_bytes = Ciphers.encrypt_aes_128_ctr(private_key.secret, encryption_key[:16], iv)
-
- mac = to_mac(encryption_key[16:], ciphertext_bytes)
-
- crypto_dict = {
- 'cipher': 'aes-128-ctr',
- 'ciphertext': ciphertext_bytes.hex(),
- 'cipherparams': {
- 'iv': iv_bytes.hex(),
- },
- 'kdf': 'scrypt',
- 'kdfparams': default_kdfparams,
- 'mac': mac.hex(),
- }
-
- uu = uuid.uuid1()
- o = {
- 'address': address_hex,
- 'version': 3,
- 'crypto': crypto_dict,
- 'id': str(uu),
- }
- return o
-
-
-def from_dict(o, passphrase=''):
-
- cipher = o['crypto']['cipher']
- if cipher not in algo_keywords:
- raise NotImplementedError('cipher "{}" not implemented'.format(cipher))
-
- kdf = o['crypto']['kdf']
- if kdf not in hash_keywords:
- raise NotImplementedError('kdf "{}" not implemented'.format(kdf))
-
- m = getattr(Hashes, 'from_{}'.format(kdf.replace('-', '_')))
- decryption_key = m(o['crypto']['kdfparams'], passphrase)
-
- control_mac = bytes.fromhex(o['crypto']['mac'])
- iv_bytes = bytes.fromhex(o['crypto']['cipherparams']['iv'])
- iv = int.from_bytes(iv_bytes, "big")
- ciphertext_bytes = bytes.fromhex(o['crypto']['ciphertext'])
-
- # check mac
- calculated_mac = to_mac(decryption_key[16:], ciphertext_bytes)
- if control_mac != calculated_mac:
- raise DecryptError('mac mismatch when decrypting passphrase')
-
- m = getattr(Ciphers, 'decrypt_{}'.format(cipher.replace('-', '_')))
-
- try:
- pk = m(ciphertext_bytes, decryption_key[:16], iv)
- except AssertionError as e:
- raise DecryptError('could not decrypt keyfile: {}'.format(e))
- logg.debug('bar')
-
- return pk
-
-
-def from_file(filepath, passphrase=''):
-
- f = open(filepath, 'r')
- try:
- o = json.load(f)
- except json.decoder.JSONDecodeError as e:
- f.close()
- raise KeyfileError(e)
- f.close()
-
- return from_dict(o, passphrase)
-
-
-def from_some(v, passphrase=''):
- if isinstance(v, bytes):
- v = v.decode('utf-8')
-
- if isinstance(v, str):
- try:
- return from_file(v, passphrase)
- except Exception:
- logg.debug('keyfile parse as file fail')
- pass
- v = json.loads(v)
-
- return from_dict(v, passphrase)
diff --git a/funga/eth/keystore/sql.py b/funga/eth/keystore/sql.py
@@ -1,108 +0,0 @@
-# standard imports
-import logging
-import base64
-
-# external imports
-from cryptography.fernet import Fernet
-#import psycopg2
-from sqlalchemy import create_engine, text
-from sqlalchemy.orm import sessionmaker
-import sha3
-from hexathon import (
- strip_0x,
- add_0x,
- )
-
-# local imports
-from .interface import EthKeystore
-#from . import keyapi
-from funga.error import UnknownAccountError
-from funga.eth.encoding import private_key_to_address
-
-logg = logging.getLogger(__name__)
-
-
-def to_bytes(x):
- return x.encode('utf-8')
-
-
-class SQLKeystore(EthKeystore):
-
- schema = [
- """CREATE TABLE IF NOT EXISTS ethereum (
- id SERIAL NOT NULL PRIMARY KEY,
- key_ciphertext VARCHAR(256) NOT NULL,
- wallet_address_hex CHAR(40) NOT NULL
- );
-""",
- """CREATE UNIQUE INDEX IF NOT EXISTS ethereum_address_idx ON ethereum ( wallet_address_hex );
-""",
- ]
-
- def __init__(self, dsn, **kwargs):
- super(SQLKeystore, self).__init__()
- logg.debug('starting db session with dsn {}'.format(dsn))
- self.db_engine = create_engine(dsn)
- self.db_session = sessionmaker(bind=self.db_engine)()
- for s in self.schema:
- self.db_session.execute(s)
- self.db_session.commit()
- self.symmetric_key = kwargs.get('symmetric_key')
-
-
- def __del__(self):
- logg.debug('closing db session')
- self.db_session.close()
-
-
- def get(self, address, password=None):
- safe_address = strip_0x(address).lower()
- s = text('SELECT key_ciphertext FROM ethereum WHERE wallet_address_hex = :a')
- r = self.db_session.execute(s, {
- 'a': safe_address,
- },
- )
- try:
- k = r.first()[0]
- except TypeError:
- self.db_session.rollback()
- raise UnknownAccountError(safe_address)
- self.db_session.commit()
- a = self._decrypt(k, password)
- return a
-
-
- def import_key(self, pk, password=None):
- address_hex = private_key_to_address(pk)
- address_hex_clean = strip_0x(address_hex).lower()
-
- c = self._encrypt(pk.secret, password)
- s = text('INSERT INTO ethereum (wallet_address_hex, key_ciphertext) VALUES (:a, :c)') #%s, %s)')
- self.db_session.execute(s, {
- 'a': address_hex_clean,
- 'c': c.decode('utf-8'),
- },
- )
- self.db_session.commit()
- logg.info('added private key for address {}'.format(address_hex_clean))
- return add_0x(address_hex)
-
-
- def _encrypt(self, private_key, password):
- f = self._generate_encryption_engine(password)
- return f.encrypt(private_key)
-
-
- def _generate_encryption_engine(self, password):
- h = sha3.keccak_256()
- h.update(self.symmetric_key)
- if password != None:
- password_bytes = to_bytes(password)
- h.update(password_bytes)
- g = h.digest()
- return Fernet(base64.b64encode(g))
-
-
- def _decrypt(self, c, password):
- f = self._generate_encryption_engine(password)
- return f.decrypt(c.encode('utf-8'))
diff --git a/funga/eth/runnable/keyfile.py b/funga/eth/runnable/keyfile.py
@@ -1,87 +0,0 @@
-# standard imports
-import os
-import logging
-import sys
-import json
-import argparse
-import getpass
-
-# external impors
-import coincurve
-from hexathon import strip_0x
-
-# local imports
-from funga.error import DecryptError
-from funga.eth.keystore.keyfile import (
- from_file,
- to_dict,
- )
-from funga.eth.encoding import (
- private_key_to_address,
- private_key_from_bytes,
- )
-
-
-logging.basicConfig(level=logging.WARNING)
-logg = logging.getLogger()
-
-argparser = argparse.ArgumentParser()
-argparser.add_argument('-d', '--decrypt', dest='d', type=str, help='decrypt file')
-argparser.add_argument('--private-key', dest='private_key', action='store_true', help='output private key instead of address')
-argparser.add_argument('-z', action='store_true', help='zero-length password')
-argparser.add_argument('-k', type=str, help='load key from file')
-argparser.add_argument('-v', action='store_true', help='be verbose')
-args = argparser.parse_args()
-
-if args.v:
- logg.setLevel(logging.DEBUG)
-
-mode = 'create'
-secret = False
-if args.d:
- mode = 'decrypt'
- if args.private_key:
- secret = True
-
-pk_hex = os.environ.get('PRIVATE_KEY')
-if args.k != None:
- f = open(args.k, 'r')
- pk_hex = f.read(66)
- f.close()
-
-def main():
- global pk_hex
-
- passphrase = os.environ.get('PASSPHRASE')
- if args.z:
- passphrase = ''
- r = None
- if mode == 'decrypt':
- if passphrase == None:
- passphrase = getpass.getpass('decryption phrase: ')
- try:
- r = from_file(args.d, passphrase).hex()
- except DecryptError:
- sys.stderr.write('Invalid passphrase\n')
- sys.exit(1)
- if not secret:
- pk = private_key_from_bytes(bytes.fromhex(r))
- r = private_key_to_address(pk)
- elif mode == 'create':
- if passphrase == None:
- passphrase = getpass.getpass('encryption phrase: ')
- pk_bytes = None
- if pk_hex != None:
- pk_hex = strip_0x(pk_hex)
- pk_bytes = bytes.fromhex(pk_hex)
- else:
- pk_bytes = os.urandom(32)
- pk = coincurve.PrivateKey(secret=pk_bytes)
- o = to_dict(pk_bytes, passphrase)
- r = json.dumps(o)
-
- print(r)
-
-
-if __name__ == '__main__':
- main()
diff --git a/funga/eth/runnable/signer.py b/funga/eth/runnable/signer.py
@@ -1,122 +0,0 @@
-# standard imports
-import re
-import os
-import sys
-import json
-import logging
-import argparse
-from urllib.parse import urlparse
-
-# external imports
-import confini
-from jsonrpc.exceptions import *
-
-# local imports
-from crypto_dev_signer.eth.signer import ReferenceSigner
-from crypto_dev_signer.keystore.reference import ReferenceKeystore
-from crypto_dev_signer.cli.handle import SignRequestHandler
-
-logging.basicConfig(level=logging.WARNING)
-logg = logging.getLogger()
-
-config_dir = '.'
-
-db = None
-signer = None
-session = None
-chainId = 8995
-socket_path = '/run/crypto-dev-signer/jsonrpc.ipc'
-
-argparser = argparse.ArgumentParser()
-argparser.add_argument('-c', type=str, default=config_dir, help='config file')
-argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
-argparser.add_argument('-i', type=int, help='default chain id for EIP155')
-argparser.add_argument('-s', type=str, help='socket path')
-argparser.add_argument('-v', action='store_true', help='be verbose')
-argparser.add_argument('-vv', action='store_true', help='be more verbose')
-args = argparser.parse_args()
-
-if args.vv:
- logging.getLogger().setLevel(logging.DEBUG)
-elif args.v:
- logging.getLogger().setLevel(logging.INFO)
-
-config = confini.Config(args.c, args.env_prefix)
-config.process()
-config.censor('PASSWORD', 'DATABASE')
-config.censor('SECRET', 'SIGNER')
-logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
-
-if args.i:
- chainId = args.i
-if args.s:
- socket_url = urlparse(args.s)
-elif config.get('SIGNER_SOCKET_PATH'):
- socket_url = urlparse(config.get('SIGNER_SOCKET_PATH'))
-
-
-# connect to database
-dsn = 'postgresql://{}:{}@{}:{}/{}'.format(
- config.get('DATABASE_USER'),
- config.get('DATABASE_PASSWORD'),
- config.get('DATABASE_HOST'),
- config.get('DATABASE_PORT'),
- config.get('DATABASE_NAME'),
- )
-
-logg.info('using dsn {}'.format(dsn))
-logg.info('using socket {}'.format(config.get('SIGNER_SOCKET_PATH')))
-
-re_http = r'^http'
-re_tcp = r'^tcp'
-re_unix = r'^ipc'
-
-class MissingSecretError(Exception):
- pass
-
-
-def main():
-
- secret_hex = config.get('SIGNER_SECRET')
- if secret_hex == None:
- raise MissingSecretError('please provide a valid hex value for the SIGNER_SECRET configuration variable')
-
- secret = bytes.fromhex(secret_hex)
- kw = {
- 'symmetric_key': secret,
- }
- SignRequestHandler.keystore = ReferenceKeystore(dsn, **kw)
- SignRequestHandler.signer = ReferenceSigner(SignRequestHandler.keystore)
-
- arg = None
- try:
- arg = json.loads(sys.argv[1])
- except:
- logg.info('no json rpc command detected, starting socket server {}'.format(socket_url))
- scheme = 'ipc'
- if socket_url.scheme != '':
- scheme = socket_url.scheme
- if re.match(re_tcp, socket_url.scheme):
- from crypto_dev_signer.cli.socket import start_server_tcp
- socket_spec = socket_url.netloc.split(':')
- host = socket_spec[0]
- port = int(socket_spec[1])
- start_server_tcp((host, port))
- elif re.match(re_http, socket_url.scheme):
- from crypto_dev_signer.cli.http import start_server_http
- socket_spec = socket_url.netloc.split(':')
- host = socket_spec[0]
- port = int(socket_spec[1])
- start_server_http((host, port))
- else:
- from crypto_dev_signer.cli.socket import start_server_unix
- start_server_unix(socket_url.path)
- sys.exit(0)
-
- (rpc_id, response) = process_input(arg)
- r = jsonrpc_ok(rpc_id, response)
- sys.stdout.write(json.dumps(r))
-
-
-if __name__ == '__main__':
- main()
diff --git a/funga/eth/signer/__init__.py b/funga/eth/signer/__init__.py
@@ -1 +0,0 @@
-from funga.eth.signer.defaultsigner import EIP155Signer
diff --git a/funga/eth/signer/defaultsigner.py b/funga/eth/signer/defaultsigner.py
@@ -1,79 +0,0 @@
-# standard imports
-import logging
-
-# external imports
-import sha3
-import coincurve
-from hexathon import int_to_minbytes
-
-# local imports
-from funga.signer import Signer
-from funga.eth.encoding import chain_id_to_v
-
-logg = logging.getLogger(__name__)
-
-
-class EIP155Signer(Signer):
-
- def __init__(self, keyGetter):
- super(EIP155Signer, self).__init__(keyGetter)
-
-
- def sign_transaction(self, tx, password=None):
- s = tx.rlp_serialize()
- h = sha3.keccak_256()
- h.update(s)
- message_to_sign = h.digest()
- z = self.sign_pure(tx.sender, message_to_sign, password)
-
- return z
-
-
- def sign_transaction_to_rlp(self, tx, password=None):
- chain_id = int.from_bytes(tx.v, byteorder='big')
- sig = self.sign_transaction(tx, password)
- tx.apply_signature(chain_id, sig)
- return tx.rlp_serialize()
-
-
- def sign_transaction_to_wire(self, tx, password=None):
- return self.sign_transaction_to_rlp(tx, password=password)
-
-
- def sign_ethereum_message(self, address, message, password=None):
-
- #k = keys.PrivateKey(self.keyGetter.get(address, password))
- #z = keys.ecdsa_sign(message_hash=g, private_key=k)
- if type(message).__name__ == 'str':
- logg.debug('signing message in "str" format: {}'.format(message))
- #z = k.sign_msg(bytes.fromhex(message))
- message = bytes.fromhex(message)
- elif type(message).__name__ == 'bytes':
- logg.debug('signing message in "bytes" format: {}'.format(message.hex()))
- #z = k.sign_msg(message)
- else:
- logg.debug('unhandled format {}'.format(type(message).__name__))
- raise ValueError('message must be type str or bytes, received {}'.format(type(message).__name__))
-
- ethereumed_message_header = b'\x19' + 'Ethereum Signed Message:\n{}'.format(len(message)).encode('utf-8')
- h = sha3.keccak_256()
- h.update(ethereumed_message_header + message)
- message_to_sign = h.digest()
-
- z = self.sign_pure(address, message_to_sign, password)
- return z
-
-
- # TODO: generic sign should be moved to non-eth context
- def sign_pure(self, address, message, password=None):
- pk = coincurve.PrivateKey(secret=self.keyGetter.get(address, password))
- z = pk.sign_recoverable(hasher=None, message=message)
- return z
-
-
- def sign_message(self, address, message, password=None, dialect='eth'):
- if dialect == None:
- return self.sign_pure(address, message, password=password)
- elif dialect == 'eth':
- return self.sign_ethereum_message(address, message, password=password)
- raise ValueError('Unknown message sign dialect "{}"'.format(dialect))
diff --git a/funga/eth/transaction.py b/funga/eth/transaction.py
@@ -1,172 +0,0 @@
-# standard imports
-import logging
-import binascii
-import re
-
-# external imports
-#from rlp import encode as rlp_encode
-from hexathon import (
- strip_0x,
- add_0x,
- int_to_minbytes,
- )
-
-# local imports
-from funga.eth.encoding import chain_id_to_v
-#from crypto_dev_signer.eth.rlp import rlp_encode
-import rlp
-
-logg = logging.getLogger(__name__)
-
-rlp_encode = rlp.encode
-
-class Transaction:
-
- def rlp_serialize(self):
- raise NotImplementedError
-
- def serialize(self):
- raise NotImplementedError
-
-
-class EIP155Transaction:
-
- def __init__(self, tx, nonce_in, chainId_in=1):
- to = b''
- data = b''
- if tx.get('to') != None:
- to = bytes.fromhex(strip_0x(tx['to'], allow_empty=True))
- if tx.get('data') != None:
- data = bytes.fromhex(strip_0x(tx['data'], allow_empty=True))
-
- gas_price = None
- start_gas = None
- value = None
- nonce = None
- chainId = None
-
- # TODO: go directly from hex to bytes
- try:
- gas_price = int(tx['gasPrice'])
- byts = ((gas_price.bit_length()-1)/8)+1
- gas_price = gas_price.to_bytes(int(byts), 'big')
- except ValueError:
- gas_price = bytes.fromhex(strip_0x(tx['gasPrice'], allow_empty=True))
-
- try:
- start_gas = int(tx['gas'])
- byts = ((start_gas.bit_length()-1)/8)+1
- start_gas = start_gas.to_bytes(int(byts), 'big')
- except ValueError:
- start_gas = bytes.fromhex(strip_0x(tx['gas'], allow_empty=True))
-
- try:
- value = int(tx['value'])
- byts = ((value.bit_length()-1)/8)+1
- value = value.to_bytes(int(byts), 'big')
- except ValueError:
- value = bytes.fromhex(strip_0x(tx['value'], allow_empty=True))
-
- try:
- nonce = int(nonce_in)
- byts = ((nonce.bit_length()-1)/8)+1
- nonce = nonce.to_bytes(int(byts), 'big')
- except ValueError:
- nonce = bytes.fromhex(strip_0x(nonce_in, allow_empty=True))
-
- try:
- chainId = int(chainId_in)
- byts = ((chainId.bit_length()-1)/8)+1
- chainId = chainId.to_bytes(int(byts), 'big')
- except ValueError:
- chainId = bytes.fromhex(strip_0x(chainId_in, allow_empty=True))
-
- self.nonce = nonce
- self.gas_price = gas_price
- self.start_gas = start_gas
- self.to = to
- self.value = value
- self.data = data
- self.v = chainId
- self.r = b''
- self.s = b''
- self.sender = strip_0x(tx['from'])
-
-
- def canonical_order(self):
- s = [
- self.nonce,
- self.gas_price,
- self.start_gas,
- self.to,
- self.value,
- self.data,
- self.v,
- self.r,
- self.s,
- ]
-
- return s
-
-
- def bytes_serialize(self):
- s = self.canonical_order()
- b = b''
- for e in s:
- b += e
- return b
-
-
- def rlp_serialize(self):
- s = self.canonical_order()
- return rlp_encode(s)
-
-
- def serialize(self):
- tx = {
- 'nonce': add_0x(self.nonce.hex(), allow_empty=True),
- 'gasPrice': add_0x(self.gas_price.hex()),
- 'gas': add_0x(self.start_gas.hex()),
- 'value': add_0x(self.value.hex(), allow_empty=True),
- 'data': add_0x(self.data.hex(), allow_empty=True),
- 'v': add_0x(self.v.hex(), allow_empty=True),
- 'r': add_0x(self.r.hex(), allow_empty=True),
- 's': add_0x(self.s.hex(), allow_empty=True),
- }
- if self.to == None or len(self.to) == 0:
- tx['to'] = None
- else:
- tx['to'] = add_0x(self.to.hex())
-
- if tx['data'] == '':
- tx['data'] = '0x'
-
- if tx['value'] == '':
- tx['value'] = '0x00'
-
- if tx['nonce'] == '':
- tx['nonce'] = '0x00'
-
- return tx
-
-
- def apply_signature(self, chain_id, signature, v=None):
- if len(self.r + self.s) > 0:
- raise AttributeError('signature already set')
- if len(signature) < 65:
- raise ValueError('invalid signature length')
- if v == None:
- v = chain_id_to_v(chain_id, signature)
- self.v = int_to_minbytes(v)
- self.r = signature[:32]
- self.s = signature[32:64]
-
- for i in range(len(self.r)):
- if self.r[i] > 0:
- self.r = self.r[i:]
- break
-
- for i in range(len(self.s)):
- if self.s[i] > 0:
- self.s = self.s[i:]
- break
diff --git a/funga/eth/web3ext/__init__.py b/funga/eth/web3ext/__init__.py
@@ -1,29 +0,0 @@
-import logging
-import re
-
-from web3 import Web3 as Web3super
-from web3 import WebsocketProvider, HTTPProvider
-from .middleware import PlatformMiddleware
-
-re_websocket = re.compile('^wss?://')
-re_http = re.compile('^https?://')
-
-logg = logging.getLogger(__file__)
-
-
-def create_middleware(ipcpath):
- PlatformMiddleware.ipcaddr = ipcpath
- return PlatformMiddleware
-
-
-# overrides the original Web3 constructor
-#def Web3(blockchain_provider='ws://localhost:8546', ipcpath=None):
-def Web3(provider, ipcpath=None):
- w3 = Web3super(provider)
-
- if ipcpath != None:
- logg.info('using signer middleware with ipc {}'.format(ipcpath))
- w3.middleware_onion.add(create_middleware(ipcpath))
-
- w3.eth.personal = w3.geth.personal
- return w3
diff --git a/funga/eth/web3ext/middleware.py b/funga/eth/web3ext/middleware.py
@@ -1,116 +0,0 @@
-# standard imports
-import logging
-import re
-import socket
-import uuid
-import json
-
-logg = logging.getLogger(__file__)
-
-
-def jsonrpc_request(method, params):
- uu = uuid.uuid4()
- return {
- "jsonrpc": "2.0",
- "id": str(uu),
- "method": method,
- "params": params,
- }
-
-class PlatformMiddleware:
-
- # id for the request is not available, meaning we cannot easily short-circuit
- # hack workaround
- id_seq = -1
- re_personal = re.compile('^personal_.*')
- ipcaddr = None
-
-
- def __init__(self, make_request, w3):
- self.w3 = w3
- self.make_request = make_request
- if self.ipcaddr == None:
- raise AttributeError('ipcaddr not set')
-
-
- # TODO: understand what format input params come in
- # single entry input gives a tuple on params, wtf...
- # dict input comes as [{}] and fails if not passed on as an array
- @staticmethod
- def _translate_params(params):
- #if params.__class__.__name__ == 'tuple':
- # r = []
- # for p in params:
- # r.append(p)
- # return r
-
- if params.__class__.__name__ == 'list' and len(params) > 0:
- return params[0]
-
- return params
-
-
- # TODO: DRY
- def __call__(self, method, suspect_params):
-
- self.id_seq += 1
- logg.debug('in middleware method {} params {} ipcpath {}'.format(method, suspect_params, self.ipcaddr))
-
- if self.re_personal.match(method) != None:
- params = PlatformMiddleware._translate_params(suspect_params)
- # multiple providers is removed in web3.py 5.12.0
- # https://github.com/ethereum/web3.py/issues/1701
- # thus we need a workaround to use the same web3 instance
- s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM, proto=0)
- ipc_provider_workaround = s.connect(self.ipcaddr)
-
- logg.info('redirecting method {} params {} original params {}'.format(method, params, suspect_params))
- o = jsonrpc_request(method, params[0])
- j = json.dumps(o)
- logg.debug('send {}'.format(j))
- s.send(j.encode('utf-8'))
- r = s.recv(4096)
- s.close()
- logg.debug('got recv {}'.format(str(r)))
- jr = json.loads(r)
- jr['id'] = self.id_seq
- #return str(json.dumps(jr))
- return jr
-
- elif method == 'eth_signTransaction':
- params = PlatformMiddleware._translate_params(suspect_params)
- s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM, proto=0)
- ipc_provider_workaround = s.connect(self.ipcaddr)
- logg.info('redirecting method {} params {} original params {}'.format(method, params, suspect_params))
- o = jsonrpc_request(method, params[0])
- j = json.dumps(o)
- logg.debug('send {}'.format(j))
- s.send(j.encode('utf-8'))
- r = s.recv(4096)
- s.close()
- logg.debug('got recv {}'.format(str(r)))
- jr = json.loads(r)
- jr['id'] = self.id_seq
- #return str(json.dumps(jr))
- return jr
-
- elif method == 'eth_sign':
- params = PlatformMiddleware._translate_params(suspect_params)
- s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM, proto=0)
- ipc_provider_workaround = s.connect(self.ipcaddr)
- logg.info('redirecting method {} params {} original params {}'.format(method, params, suspect_params))
- o = jsonrpc_request(method, params)
- j = json.dumps(o)
- logg.debug('send {}'.format(j))
- s.send(j.encode('utf-8'))
- r = s.recv(4096)
- s.close()
- logg.debug('got recv {}'.format(str(r)))
- jr = json.loads(r)
- jr['id'] = self.id_seq
- return jr
-
-
-
- r = self.make_request(method, suspect_params)
- return r
diff --git a/requirements.txt b/requirements.txt
@@ -1,8 +0,0 @@
-cryptography==3.2.1
-pysha3==1.0.2
-rlp==2.0.1
-json-rpc==1.13.0
-confini>=0.3.6rc3,<0.5.0
-coincurve==15.0.0
-hexathon~=0.0.1a7
-pycryptodome==3.10.1
diff --git a/setup.py b/setup.py
@@ -13,15 +13,6 @@ while True:
requirements.append(l.rstrip())
f.close()
-sql_requirements = []
-f = open('sql_requirements.txt', 'r')
-while True:
- l = f.readline()
- if l == '':
- break
- sql_requirements.append(l.rstrip())
-f.close()
-
test_requirements = []
f = open('test_requirements.txt', 'r')
while True:
@@ -39,24 +30,10 @@ setup(
author_email="dev@holbrook.no",
packages=[
'funga',
- 'funga.eth.signer',
- 'funga.eth',
- 'funga.eth.cli',
- 'funga.eth.keystore',
- 'funga.eth.runnable',
],
install_requires=requirements,
- extras_require={
- 'sql': sql_requirements,
- },
tests_require=test_requirements,
long_description=long_description,
long_description_content_type='text/markdown',
- entry_points = {
- 'console_scripts': [
- 'crypto-dev-daemon=funga.runnable.signer:main',
- 'eth-keyfile=funga.runnable.keyfile:main',
- ],
- },
- url='https://gitlab.com/chaintool/funga',
+ url='https://gitlab.com/chaintool/funga',
)
diff --git a/sql_requirements.txt b/sql_requirements.txt
@@ -1,2 +0,0 @@
-psycopg2==2.8.6
-sqlalchemy==1.3.20
diff --git a/tests/test_cli.py b/tests/test_cli.py
@@ -1,88 +0,0 @@
-# standard imports
-import unittest
-import logging
-import os
-
-# external imports
-from hexathon import strip_0x
-
-# local imports
-from funga.eth.signer import EIP155Signer
-from funga.eth.keystore.dict import DictKeystore
-from funga.eth.cli.handle import SignRequestHandler
-from funga.eth.transaction import EIP155Transaction
-
-logging.basicConfig(level=logging.DEBUG)
-logg = logging.getLogger()
-
-script_dir = os.path.dirname(os.path.realpath(__file__))
-data_dir = os.path.join(script_dir, 'testdata')
-
-class TestCli(unittest.TestCase):
-
- def setUp(self):
- #pk = bytes.fromhex('5087503f0a9cc35b38665955eb830c63f778453dd11b8fa5bd04bc41fd2cc6d6')
- #pk_getter = pkGetter(pk)
- self.keystore = DictKeystore()
- SignRequestHandler.keystore = self.keystore
- self.signer = EIP155Signer(self.keystore)
- SignRequestHandler.signer = self.signer
- self.handler = SignRequestHandler()
-
-
- def test_new_account(self):
- q = {
- 'id': 0,
- 'method': 'personal_newAccount',
- 'params': [''],
- }
- (rpc_id, result) = self.handler.process_input(q)
- self.assertTrue(self.keystore.get(result))
-
-
- def test_sign_tx(self):
- keystore_file = os.path.join(data_dir, 'UTC--2021-01-08T18-37-01.187235289Z--00a329c0648769a73afac7f9381e08fb43dbea72')
- sender = self.keystore.import_keystore_file(keystore_file)
- tx_hexs = {
- 'nonce': '0x',
- 'from': sender,
- 'gasPrice': "0x04a817c800",
- 'gas': "0x5208",
- 'to': '0x3535353535353535353535353535353535353535',
- 'value': "0x03e8",
- 'data': "0xdeadbeef",
- 'chainId': 8995,
- }
- tx = EIP155Transaction(tx_hexs, 42, 8995)
- tx_s = tx.serialize()
-
- # TODO: move to serialization wrapper for tests
- tx_s['chainId'] = tx_s['v']
- tx_s['from'] = sender
-
- # eth_signTransaction wraps personal_signTransaction, so here we test both already
- q = {
- 'id': 0,
- 'method': 'eth_signTransaction',
- 'params': [tx_s],
- }
- (rpc_id, result) = self.handler.process_input(q)
- logg.debug('result {}'.format(result))
-
- self.assertEqual(strip_0x(result), 'f86c2a8504a817c8008252089435353535353535353535353535353535353535358203e884deadbeef82466aa0b7c1bbf52f736ada30fe253c7484176f44d6fd097a9720dc85ae5bbc7f060e54a07afee2563b0cf6d00333df51cc62b0d13c63108b2bce54ce2ad24e26ce7b4f25')
-
- def test_sign_msg(self):
- keystore_file = os.path.join(data_dir, 'UTC--2021-01-08T18-37-01.187235289Z--00a329c0648769a73afac7f9381e08fb43dbea72')
- sender = self.keystore.import_keystore_file(keystore_file)
- q = {
- 'id': 0,
- 'method': 'eth_sign',
- 'params': [sender, '0xdeadbeef'],
- }
- (rpc_id, result) = self.handler.process_input(q)
- logg.debug('result msg {}'.format(result))
- self.assertEqual(strip_0x(result), '50320dda75190a121b7b5979de66edadafd02bdfbe4f6d49552e79c01410d2464aae35e385c0e5b61663ff7b44ef65fa0ac7ad8a57472cf405db399b9dba3e1600')
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/tests/test_keystore_dict.py b/tests/test_keystore_dict.py
@@ -1,63 +0,0 @@
-#!/usr/bin/python
-
-# standard imports
-import unittest
-import logging
-import base64
-import os
-
-# external imports
-from hexathon import (
- strip_0x,
- add_0x,
- )
-
-# local imports
-from funga.error import UnknownAccountError
-from funga.eth.keystore.dict import DictKeystore
-from funga.eth.signer import EIP155Signer
-
-logging.basicConfig(level=logging.DEBUG)
-logg = logging.getLogger()
-
-script_dir = os.path.realpath(os.path.dirname(__file__))
-
-
-class TestDict(unittest.TestCase):
-
- address_hex = None
- db = None
-
- def setUp(self):
- self.db = DictKeystore()
-
- keystore_filepath = os.path.join(script_dir, 'testdata', 'UTC--2021-01-08T18-37-01.187235289Z--00a329c0648769a73afac7f9381e08fb43dbea72')
-
- address_hex = self.db.import_keystore_file(keystore_filepath, '')
- self.address_hex = add_0x(address_hex)
-
-
- def tearDown(self):
- pass
-
-
- def test_get_key(self):
- logg.debug('getting {}'.format(strip_0x(self.address_hex)))
- pk = self.db.get(strip_0x(self.address_hex), '')
-
- self.assertEqual(self.address_hex.lower(), '0x00a329c0648769a73afac7f9381e08fb43dbea72')
-
- bogus_account = os.urandom(20).hex()
- with self.assertRaises(UnknownAccountError):
- self.db.get(bogus_account, '')
-
-
- def test_sign_message(self):
- s = EIP155Signer(self.db)
- z = s.sign_ethereum_message(strip_0x(self.address_hex), b'foo')
- logg.debug('zzz {}'.format(str(z)))
-
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/tests/test_keystore_reference.py b/tests/test_keystore_reference.py
@@ -1,64 +0,0 @@
-#!/usr/bin/python
-
-# standard imports
-import unittest
-import logging
-import base64
-import os
-
-# external imports
-import psycopg2
-from psycopg2 import sql
-from cryptography.fernet import Fernet, InvalidToken
-
-# local imports
-from funga.eth.keystore.sql import SQLKeystore
-from funga.error import UnknownAccountError
-
-logging.basicConfig(level=logging.DEBUG)
-logg = logging.getLogger()
-
-
-class TestDatabase(unittest.TestCase):
-
- conn = None
- cur = None
- symkey = None
- address_hex = None
- db = None
-
- def setUp(self):
- logg.debug('setup')
- # arbitrary value
- symkey_hex = 'E92431CAEE69313A7BE9E443C4ABEED9BF8157E9A13553B4D5D6E7D51B5021D9'
- self.symkey = bytes.fromhex(symkey_hex)
- self.address_hex = '9FA61f0E52A5C51b43f0d32404625BC436bb7041'
-
- kw = {
- 'symmetric_key': self.symkey,
- }
- self.db = SQLKeystore('postgres+psycopg2://postgres@localhost:5432/signer_test', **kw)
- self.address_hex = self.db.new('foo')
- #self.address_hex = add_0x(address_hex)
-
-
- def tearDown(self):
- self.db.db_session.execute('DROP INDEX ethereum_address_idx;')
- self.db.db_session.execute('DROP TABLE ethereum;')
- self.db.db_session.commit()
-
-
-
- def test_get_key(self):
- logg.debug('getting {}'.format(self.address_hex))
- self.db.get(self.address_hex, 'foo')
- with self.assertRaises(InvalidToken):
- self.db.get(self.address_hex, 'bar')
-
- bogus_account = '0x' + os.urandom(20).hex()
- with self.assertRaises(UnknownAccountError):
- self.db.get(bogus_account, 'bar')
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/tests/test_sign.py b/tests/test_sign.py
@@ -1,99 +0,0 @@
-# standard imports
-import unittest
-import logging
-import json
-
-from rlp import encode as rlp_encode
-
-from funga.eth.signer import EIP155Signer
-from funga.eth.transaction import EIP155Transaction
-
-logging.basicConfig(level=logging.DEBUG)
-logg = logging.getLogger()
-
-
-tx_ints = {
- 'nonce': 0,
- 'from': "0xEB014f8c8B418Db6b45774c326A0E64C78914dC0",
- 'gasPrice': "20000000000",
- 'gas': "21000",
- 'to': '0x3535353535353535353535353535353535353535',
- 'value': "1000",
- 'data': "deadbeef",
-}
-
-tx_hexs = {
- 'nonce': '0x0',
- 'from': "0xEB014f8c8B418Db6b45774c326A0E64C78914dC0",
- 'gasPrice': "0x4a817c800",
- 'gas': "0x5208",
- 'to': '0x3535353535353535353535353535353535353535',
- 'value': "0x3e8",
- 'data': "deadbeef",
-}
-
-class pkGetter:
-
- def __init__(self, pk):
- self.pk = pk
-
- def get(self, address, password=None):
- return self.pk
-
-
-class TestSign(unittest.TestCase):
-
- pk = None
- nonce = -1
- pk_getter = None
-
-
- def getNonce(self):
- self.nonce += 1
- return self.nonce
-
-
- def setUp(self):
- self.pk = bytes.fromhex('5087503f0a9cc35b38665955eb830c63f778453dd11b8fa5bd04bc41fd2cc6d6')
- self.pk_getter = pkGetter(self.pk)
-
-
- def tearDown(self):
- logg.info('teardown empty')
-
-
-
- # TODO: verify rlp tx output
- def test_serialize_transaction(self):
- t = EIP155Transaction(tx_ints, 0)
- self.assertRegex(t.__class__.__name__, "Transaction")
- s = t.serialize()
- self.assertDictEqual(s, {'nonce': '0x', 'gasPrice': '0x04a817c800', 'gas': '0x5208', 'to': '0x3535353535353535353535353535353535353535', 'value': '0x03e8', 'data': '0xdeadbeef', 'v': '0x01', 'r': '0x', 's': '0x'})
- r = t.rlp_serialize()
- self.assertEqual(r.hex(), 'ea808504a817c8008252089435353535353535353535353535353535353535358203e884deadbeef018080')
-
- t = EIP155Transaction(tx_hexs, 0)
- self.assertRegex(t.__class__.__name__, "Transaction")
- s = t.serialize()
- #o = json.loads(s)
- self.assertDictEqual(s, {'nonce': '0x', 'gasPrice': '0x04a817c800', 'gas': '0x5208', 'to': '0x3535353535353535353535353535353535353535', 'value': '0x03e8', 'data': '0xdeadbeef', 'v': '0x01', 'r': '0x', 's': '0x'})
- r = t.rlp_serialize()
- self.assertEqual(r.hex(), 'ea808504a817c8008252089435353535353535353535353535353535353535358203e884deadbeef018080')
-
-
-
- def test_sign_transaction(self):
- t = EIP155Transaction(tx_ints, 461, 8995)
- s = EIP155Signer(self.pk_getter)
- z = s.sign_transaction(t)
-
-
- def test_sign_message(self):
- s = EIP155Signer(self.pk_getter)
- z = s.sign_ethereum_message(tx_ints['from'], '666f6f')
- z = s.sign_ethereum_message(tx_ints['from'], b'foo')
- logg.debug('zzz {}'.format(str(z)))
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/tests/test_socket.py b/tests/test_socket.py
@@ -1,15 +0,0 @@
-# standard imports
-import unittest
-import logging
-
-logg = logging.getLogger(__name__)
-
-
-class SocketTest(unittest.TestCase):
-
- def test_placeholder_warning(self):
- logg.warning('socket tests are missing! :/')
-
-
-if __name__ == '__main__':
- unittest.main()