base.py (6361B)
1 # standard imports 2 import os 3 import logging 4 5 # external imports 6 import eth_tester 7 import coincurve 8 from chainlib.encode import TxHexNormalizer 9 from chainlib.connection import ( 10 RPCConnection, 11 error_parser, 12 ) 13 from chainlib.eth.address import ( 14 to_checksum_address, 15 ) 16 from chainlib.jsonrpc import ( 17 jsonrpc_response, 18 jsonrpc_error, 19 jsonrpc_result, 20 ) 21 from hexathon import ( 22 unpad, 23 add_0x, 24 strip_0x, 25 ) 26 from chainlib.eth.tx import receipt 27 28 from funga.eth.signer import EIP155Signer 29 from funga.eth.encoding import private_key_to_address 30 31 32 logg = logging.getLogger().getChild(__name__) 33 34 test_pk = bytes.fromhex('5087503f0a9cc35b38665955eb830c63f778453dd11b8fa5bd04bc41fd2cc6d6') 35 36 37 class EthTesterSigner(eth_tester.EthereumTester): 38 39 def __init__(self, backend, keystore): 40 super(EthTesterSigner, self).__init__(backend) 41 logg.debug('accounts {}'.format(self.get_accounts())) 42 43 self.keystore = keystore 44 self.backend = backend 45 self.backend.add_account(test_pk) 46 for pk in self.backend.account_keys: 47 pubk = pk.public_key 48 address = pubk.to_checksum_address() 49 logg.debug('test keystore have pk {} pubk {} addr {}'.format(pk, pk.public_key, address)) 50 self.keystore.import_raw_key(pk._raw_key) 51 52 53 def new_account(self): 54 pk = os.urandom(32) 55 address = self.keystore.import_raw_key(pk) 56 checksum_address = add_0x(to_checksum_address(address)) 57 self.backend.add_account(pk) 58 return checksum_address 59 60 61 class TestRPCConnection(RPCConnection): 62 63 def __init__(self, location, backend, signer): 64 super(TestRPCConnection, self).__init__(location) 65 self.backend = backend 66 self.signer = signer 67 68 69 def do(self, o, error_parser=error_parser): 70 logg.debug('testrpc do {}'.format(o)) 71 m = getattr(self, o['method']) 72 if m == None: 73 raise ValueError('unhandled method {}'.format(o['method'])) 74 r = None 75 try: 76 result = m(o['params']) 77 logg.debug('result {}'.format(result)) 78 r = jsonrpc_response(o['id'], result) 79 except Exception as e: 80 logg.exception(e) 81 r = jsonrpc_error(o['id'], message=str(e)) 82 return jsonrpc_result(r, error_parser) 83 84 85 def wait(self, tx_hash_hex): 86 o = receipt(tx_hash_hex) 87 return self.do(o) 88 89 90 def eth_blockNumber(self, p): 91 block = self.backend.get_block_by_number('latest') 92 return block['number'] 93 94 95 def eth_getBlockByNumber(self, p): 96 b = bytes.fromhex(strip_0x(p[0])) 97 n = int.from_bytes(b, 'big') 98 block = self.backend.get_block_by_number(n) 99 return block 100 101 102 def eth_getBlockByHash(self, p): 103 block = self.backend.get_block_by_hash(p[0]) 104 return block 105 106 107 def eth_getTransactionByBlock(self, p): 108 block = self.eth_getBlockByHash(p) 109 try: 110 tx_index = int(p[1], 16) 111 except TypeError: 112 tx_index = int(p[1]) 113 tx_hash = block['transactions'][tx_index] 114 tx = self.eth_getTransactionByHash([tx_hash]) 115 return tx 116 117 def eth_getBalance(self, p): 118 balance = self.backend.get_balance(p[0]) 119 hx = balance.to_bytes(32, 'big').hex() 120 return add_0x(unpad(hx)) 121 122 123 def eth_getTransactionCount(self, p): 124 nonce = self.backend.get_nonce(p[0]) 125 hx = nonce.to_bytes(4, 'big').hex() 126 return add_0x(unpad(hx)) 127 128 129 def eth_getTransactionByHash(self, p): 130 tx = self.backend.get_transaction_by_hash(p[0]) 131 return tx 132 133 134 def eth_getTransactionByBlockHashAndIndex(self, p): 135 #logg.debug('p {}'.format(p)) 136 #block = self.eth_getBlockByHash(p[0]) 137 #tx = block.transactions[p[1]] 138 #return eth_getTransactionByHash(tx[0]) 139 return self.eth_getTransactionByBlock(p) 140 141 142 def eth_getTransactionReceipt(self, p): 143 rcpt = self.backend.get_transaction_receipt(p[0]) 144 if rcpt.get('block_number') == None: 145 rcpt['block_number'] = rcpt['blockNumber'] 146 else: 147 rcpt['blockNumber'] = rcpt['block_number'] 148 return rcpt 149 150 151 def eth_getCode(self, p): 152 r = self.backend.get_code(p[0]) 153 return r 154 155 156 def eth_call(self, p): 157 tx_ethtester = to_ethtester_call(p[0]) 158 r = self.backend.call(tx_ethtester) 159 return r 160 161 162 def eth_gasPrice(self, p): 163 return hex(1000000000) 164 165 166 def personal_newAccount(self, passphrase): 167 a = self.backend.new_account() 168 return a 169 170 171 def eth_sign(self, p): 172 r = self.signer.sign_ethereum_message(strip_0x(p[0]), strip_0x(p[1])) 173 return r 174 175 176 def eth_sendRawTransaction(self, p): 177 r = self.backend.send_raw_transaction(p[0]) 178 return r 179 180 181 def eth_signTransaction(self, p): 182 raise NotImplementedError('needs transaction deserializer for EIP155Transaction') 183 tx_dict = p[0] 184 tx = EIP155Transaction(tx_dict, tx_dict['nonce'], tx_dict['chainId']) 185 passphrase = p[1] 186 r = self.signer.sign_transaction_to_wire(tx, passphrase) 187 return r 188 189 190 def __verify_signer(self, tx, passphrase=''): 191 pk_bytes = self.backend.keystore.get(tx.sender) 192 pk = coincurve.PrivateKey(secret=pk_bytes) 193 result_address = private_key_to_address(pk) 194 tx_normalize = TxHexNormalizer() 195 assert tx_normalize.wallet_address(strip_0x(result_address)) == tx_normalize.wallet_address(strip_0x(tx.sender)) 196 197 198 def sign_transaction(self, tx, passphrase=''): 199 self.__verify_signer(tx, passphrase) 200 return self.signer.sign_transaction(tx, passphrase) 201 202 203 def sign_transaction_to_wire(self, tx, passphrase=''): 204 self.__verify_signer(tx, passphrase) 205 return self.signer.sign_transaction_to_wire(tx, passphrase) 206 207 208 def disconnect(self): 209 pass 210 211 212 def to_ethtester_call(tx): 213 if tx['gas'] == '': 214 tx['gas'] = '0x00' 215 216 if tx['gasPrice'] == '': 217 tx['gasPrice'] = '0x00' 218 219 tx = { 220 'to': tx['to'], 221 'from': tx['from'], 222 'gas': int(tx['gas'], 16), 223 'gas_price': int(tx['gasPrice'], 16), 224 'data': tx['data'], 225 } 226 return tx