keyfile.py (5335B)
1 # standard imports 2 import os 3 import hashlib 4 import logging 5 import json 6 import uuid 7 8 # external imports 9 import coincurve 10 from Crypto.Cipher import AES 11 from Crypto.Util import Counter 12 import sha3 13 14 # local imports 15 from funga.error import ( 16 DecryptError, 17 KeyfileError, 18 ) 19 from funga.eth.encoding import private_key_to_address 20 21 logg = logging.getLogger(__name__) 22 23 algo_keywords = [ 24 'aes-128-ctr', 25 ] 26 hash_keywords = [ 27 'scrypt', 28 'pbkdf2' 29 ] 30 31 default_scrypt_kdfparams = { 32 'dklen': 32, 33 'n': 1 << 18, 34 'p': 1, 35 'r': 8, 36 'salt': os.urandom(32).hex(), 37 } 38 39 default_pbkdf2_kdfparams = { 40 'c': 100000, 41 'dklen': 32, 42 'prf': 'sha256', 43 'salt': os.urandom(32).hex(), 44 } 45 46 def to_mac(mac_key, ciphertext_bytes): 47 h = sha3.keccak_256() 48 h.update(mac_key) 49 h.update(ciphertext_bytes) 50 return h.digest() 51 52 53 class Hashes: 54 55 @staticmethod 56 def from_scrypt(kdfparams=default_scrypt_kdfparams, passphrase=''): 57 dklen = int(kdfparams['dklen']) 58 n = int(kdfparams['n']) 59 p = int(kdfparams['p']) 60 r = int(kdfparams['r']) 61 salt = bytes.fromhex(kdfparams['salt']) 62 63 return hashlib.scrypt(passphrase.encode('utf-8'), salt=salt, n=n, p=p, r=r, maxmem=1024 * 1024 * 1024, 64 dklen=dklen) 65 66 @staticmethod 67 def from_pbkdf2(kdfparams=default_pbkdf2_kdfparams, passphrase=''): 68 if kdfparams['prf'] == 'hmac-sha256': 69 kdfparams['prf'].replace('hmac-sha256','sha256') 70 71 derived_key = hashlib.pbkdf2_hmac( 72 hash_name='sha256', 73 password=passphrase.encode('utf-8'), 74 salt=bytes.fromhex(kdfparams['salt']), 75 iterations=int(kdfparams['c']), 76 dklen=int(kdfparams['dklen']) 77 ) 78 return derived_key 79 80 81 class Ciphers: 82 aes_128_block_size = 1 << 7 83 aes_iv_len = 16 84 85 @staticmethod 86 def decrypt_aes_128_ctr(ciphertext, decryption_key, iv): 87 ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv) 88 cipher = AES.new(decryption_key, AES.MODE_CTR, counter=ctr) 89 plaintext = cipher.decrypt(ciphertext) 90 return plaintext 91 92 @staticmethod 93 def encrypt_aes_128_ctr(plaintext, encryption_key, iv): 94 ctr = Counter.new(Ciphers.aes_128_block_size, initial_value=iv) 95 cipher = AES.new(encryption_key, AES.MODE_CTR, counter=ctr) 96 ciphertext = cipher.encrypt(plaintext) 97 return ciphertext 98 99 100 def to_dict(private_key_bytes, kdf='scrypt', passphrase=''): 101 private_key = coincurve.PrivateKey(secret=private_key_bytes) 102 103 if kdf == 'scrypt': 104 encryption_key = Hashes.from_scrypt(passphrase=passphrase) 105 kdfparams = default_scrypt_kdfparams 106 107 elif kdf == 'pbkdf2': 108 encryption_key = Hashes.from_pbkdf2(passphrase=passphrase) 109 kdfparams = pbkdf2_kdfparams 110 111 else: 112 raise NotImplementedError("KDF not implemented: {0}".format(kdf)) 113 114 address_hex = private_key_to_address(private_key) 115 iv_bytes = os.urandom(Ciphers.aes_iv_len) 116 iv = int.from_bytes(iv_bytes, 'big') 117 ciphertext_bytes = Ciphers.encrypt_aes_128_ctr(private_key.secret, encryption_key[:16], iv) 118 119 mac = to_mac(encryption_key[16:], ciphertext_bytes) 120 121 crypto_dict = { 122 'cipher': 'aes-128-ctr', 123 'ciphertext': ciphertext_bytes.hex(), 124 'cipherparams': { 125 'iv': iv_bytes.hex(), 126 }, 127 'kdf': kdf, 128 'kdfparams': kdfparams, 129 'mac': mac.hex(), 130 } 131 132 uu = uuid.uuid1() 133 o = { 134 'address': address_hex, 135 'version': 3, 136 'crypto': crypto_dict, 137 'id': str(uu), 138 } 139 return o 140 141 142 def from_dict(o, passphrase=''): 143 cipher = o['crypto']['cipher'] 144 if cipher not in algo_keywords: 145 raise NotImplementedError('cipher "{}" not implemented'.format(cipher)) 146 147 kdf = o['crypto']['kdf'] 148 if kdf not in hash_keywords: 149 raise NotImplementedError('kdf "{}" not implemented'.format(kdf)) 150 151 m = getattr(Hashes, 'from_{}'.format(kdf.replace('-', '_'))) 152 decryption_key = m(o['crypto']['kdfparams'], passphrase) 153 154 control_mac = bytes.fromhex(o['crypto']['mac']) 155 iv_bytes = bytes.fromhex(o['crypto']['cipherparams']['iv']) 156 iv = int.from_bytes(iv_bytes, "big") 157 ciphertext_bytes = bytes.fromhex(o['crypto']['ciphertext']) 158 159 # check mac 160 calculated_mac = to_mac(decryption_key[16:], ciphertext_bytes) 161 if control_mac != calculated_mac: 162 raise DecryptError('mac mismatch when decrypting passphrase') 163 164 m = getattr(Ciphers, 'decrypt_{}'.format(cipher.replace('-', '_'))) 165 166 try: 167 pk = m(ciphertext_bytes, decryption_key[:16], iv) 168 except AssertionError as e: 169 raise DecryptError('could not decrypt keyfile: {}'.format(e)) 170 171 return pk 172 173 174 def from_file(filepath, passphrase=''): 175 f = open(filepath, 'r') 176 try: 177 o = json.load(f) 178 except json.decoder.JSONDecodeError as e: 179 f.close() 180 raise KeyfileError(e) 181 f.close() 182 183 return from_dict(o, passphrase) 184 185 186 def from_some(v, passphrase=''): 187 if isinstance(v, bytes): 188 v = v.decode('utf-8') 189 190 if isinstance(v, str): 191 try: 192 return from_file(v, passphrase) 193 except Exception: 194 logg.debug('keyfile parse as file fail') 195 pass 196 v = json.loads(v) 197 198 return from_dict(v, passphrase)