funga-eth

Ethereum implementation of the funga keystore and signer
Info | Log | Files | Refs | README | LICENSE

keyfile.py (5335B)


      1 # standard imports
      2 import os
      3 import hashlib
      4 import logging
      5 import json
      6 import uuid
      7 
      8 # external imports
      9 import coincurve
     10 from Crypto.Cipher import AES
     11 from Crypto.Util import Counter
     12 import sha3
     13 
     14 # local imports
     15 from funga.error import (
     16     DecryptError,
     17     KeyfileError,
     18 )
     19 from funga.eth.encoding import private_key_to_address
     20 
     21 logg = logging.getLogger(__name__)
     22 
     23 algo_keywords = [
     24     'aes-128-ctr',
     25 ]
     26 hash_keywords = [
     27     'scrypt',
     28     'pbkdf2'
     29 ]
     30 
     31 default_scrypt_kdfparams = {
     32     'dklen': 32,
     33     'n': 1 << 18,
     34     'p': 1,
     35     'r': 8,
     36     'salt': os.urandom(32).hex(),
     37 }
     38 
     39 default_pbkdf2_kdfparams = {
     40     'c': 100000,
     41     'dklen': 32,
     42     'prf': 'sha256',
     43     'salt': os.urandom(32).hex(),
     44 }
     45 
     46 def to_mac(mac_key, ciphertext_bytes):
     47     h = sha3.keccak_256()
     48     h.update(mac_key)
     49     h.update(ciphertext_bytes)
     50     return h.digest()
     51 
     52 
     53 class Hashes:
     54 
     55     @staticmethod
     56     def from_scrypt(kdfparams=default_scrypt_kdfparams, passphrase=''):
     57         dklen = int(kdfparams['dklen'])
     58         n = int(kdfparams['n'])
     59         p = int(kdfparams['p'])
     60         r = int(kdfparams['r'])
     61         salt = bytes.fromhex(kdfparams['salt'])
     62 
     63         return hashlib.scrypt(passphrase.encode('utf-8'), salt=salt, n=n, p=p, r=r, maxmem=1024 * 1024 * 1024,
     64                               dklen=dklen)
     65 
     66     @staticmethod
     67     def from_pbkdf2(kdfparams=default_pbkdf2_kdfparams, passphrase=''):
     68         if kdfparams['prf'] == 'hmac-sha256':
     69             kdfparams['prf'].replace('hmac-sha256','sha256')
     70 
     71         derived_key = hashlib.pbkdf2_hmac(
     72             hash_name='sha256',
     73             password=passphrase.encode('utf-8'),
     74             salt=bytes.fromhex(kdfparams['salt']),
     75             iterations=int(kdfparams['c']),
     76             dklen=int(kdfparams['dklen'])
     77         )
     78         return derived_key
     79 
     80 
     81 class Ciphers:
     82     aes_128_block_size = 1 << 7
     83     aes_iv_len = 16
     84 
     85     @staticmethod
     86     def decrypt_aes_128_ctr(ciphertext, decryption_key, iv):
     87         ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv)
     88         cipher = AES.new(decryption_key, AES.MODE_CTR, counter=ctr)
     89         plaintext = cipher.decrypt(ciphertext)
     90         return plaintext
     91 
     92     @staticmethod
     93     def encrypt_aes_128_ctr(plaintext, encryption_key, iv):
     94         ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv)
     95         cipher = AES.new(encryption_key, AES.MODE_CTR, counter=ctr)
     96         ciphertext = cipher.encrypt(plaintext)
     97         return ciphertext
     98 
     99 
    100 def to_dict(private_key_bytes, kdf='scrypt', passphrase=''):
    101     private_key = coincurve.PrivateKey(secret=private_key_bytes)
    102 
    103     if kdf == 'scrypt':
    104         encryption_key = Hashes.from_scrypt(passphrase=passphrase)
    105         kdfparams = default_scrypt_kdfparams
    106 
    107     elif kdf == 'pbkdf2':
    108         encryption_key = Hashes.from_pbkdf2(passphrase=passphrase)
    109         kdfparams = pbkdf2_kdfparams
    110 
    111     else:
    112         raise NotImplementedError("KDF not implemented: {0}".format(kdf))
    113 
    114     address_hex = private_key_to_address(private_key)
    115     iv_bytes = os.urandom(Ciphers.aes_iv_len)
    116     iv = int.from_bytes(iv_bytes, 'big')
    117     ciphertext_bytes = Ciphers.encrypt_aes_128_ctr(private_key.secret, encryption_key[:16], iv)
    118 
    119     mac = to_mac(encryption_key[16:], ciphertext_bytes)
    120 
    121     crypto_dict = {
    122         'cipher': 'aes-128-ctr',
    123         'ciphertext': ciphertext_bytes.hex(),
    124         'cipherparams': {
    125             'iv': iv_bytes.hex(),
    126         },
    127         'kdf': kdf,
    128         'kdfparams': kdfparams,
    129         'mac': mac.hex(),
    130     }
    131 
    132     uu = uuid.uuid1()
    133     o = {
    134         'address': address_hex,
    135         'version': 3,
    136         'crypto': crypto_dict,
    137         'id': str(uu),
    138     }
    139     return o
    140 
    141 
    142 def from_dict(o, passphrase=''):
    143     cipher = o['crypto']['cipher']
    144     if cipher not in algo_keywords:
    145         raise NotImplementedError('cipher "{}" not implemented'.format(cipher))
    146 
    147     kdf = o['crypto']['kdf']
    148     if kdf not in hash_keywords:
    149         raise NotImplementedError('kdf "{}" not implemented'.format(kdf))
    150 
    151     m = getattr(Hashes, 'from_{}'.format(kdf.replace('-', '_')))
    152     decryption_key = m(o['crypto']['kdfparams'], passphrase)
    153 
    154     control_mac = bytes.fromhex(o['crypto']['mac'])
    155     iv_bytes = bytes.fromhex(o['crypto']['cipherparams']['iv'])
    156     iv = int.from_bytes(iv_bytes, "big")
    157     ciphertext_bytes = bytes.fromhex(o['crypto']['ciphertext'])
    158 
    159     # check mac
    160     calculated_mac = to_mac(decryption_key[16:], ciphertext_bytes)
    161     if control_mac != calculated_mac:
    162         raise DecryptError('mac mismatch when decrypting passphrase')
    163 
    164     m = getattr(Ciphers, 'decrypt_{}'.format(cipher.replace('-', '_')))
    165 
    166     try:
    167         pk = m(ciphertext_bytes, decryption_key[:16], iv)
    168     except AssertionError as e:
    169         raise DecryptError('could not decrypt keyfile: {}'.format(e))
    170 
    171     return pk
    172 
    173 
    174 def from_file(filepath, passphrase=''):
    175     f = open(filepath, 'r')
    176     try:
    177         o = json.load(f)
    178     except json.decoder.JSONDecodeError as e:
    179         f.close()
    180         raise KeyfileError(e)
    181     f.close()
    182 
    183     return from_dict(o, passphrase)
    184 
    185 
    186 def from_some(v, passphrase=''):
    187     if isinstance(v, bytes):
    188         v = v.decode('utf-8')
    189 
    190     if isinstance(v, str):
    191         try:
    192             return from_file(v, passphrase)
    193         except Exception:
    194             logg.debug('keyfile parse as file fail')
    195             pass
    196         v = json.loads(v)
    197 
    198     return from_dict(v, passphrase)