defaultsigner.py (3210B)
1 # standard imports 2 import logging 3 4 # external imports 5 import sha3 6 import coincurve 7 from hexathon import int_to_minbytes 8 9 # local imports 10 from funga.signer import Signer 11 from funga.eth.encoding import chain_id_to_v 12 from funga.eth.message import to_personal_message 13 from funga.eth.message import to_validator_message 14 from funga.eth.message import to_typed_message 15 16 logg = logging.getLogger(__name__) 17 18 19 class EIP155Signer(Signer): 20 21 def __init__(self, keyGetter): 22 super(EIP155Signer, self).__init__(keyGetter) 23 24 25 def sign_transaction(self, tx, password=None): 26 s = tx.rlp_serialize() 27 h = sha3.keccak_256() 28 h.update(s) 29 message_to_sign = h.digest() 30 z = self.sign_pure(tx.sender, message_to_sign, password) 31 32 return z 33 34 35 def sign_transaction_to_rlp(self, tx, password=None): 36 chain_id = int.from_bytes(tx.v, byteorder='big') 37 sig = self.sign_transaction(tx, password) 38 tx.apply_signature(chain_id, sig) 39 return tx.rlp_serialize() 40 41 42 def sign_transaction_to_wire(self, tx, password=None): 43 return self.sign_transaction_to_rlp(tx, password=password) 44 45 46 def sign_ethereum_message(self, address, message, password=None): 47 #k = keys.PrivateKey(self.keyGetter.get(address, password)) 48 #z = keys.ecdsa_sign(message_hash=g, private_key=k) 49 if type(message).__name__ == 'str': 50 logg.debug('signing message in "str" format: {}'.format(message)) 51 #z = k.sign_msg(bytes.fromhex(message)) 52 message = bytes.fromhex(message) 53 elif type(message).__name__ == 'bytes': 54 logg.debug('signing message in "bytes" format: {}'.format(message.hex())) 55 #z = k.sign_msg(message) 56 else: 57 logg.debug('unhandled format {}'.format(type(message).__name__)) 58 raise ValueError('message must be type str or bytes, received {}'.format(type(message).__name__)) 59 60 message_to_sign = to_personal_message(message, digest=True) 61 z = self.sign_pure(address, message_to_sign, password) 62 return z 63 64 65 def sign_validator_message(self, address, validator, message, password=None): 66 message_to_sign = to_validator_message(message, validator, digest=True) 67 z = self.sign_pure(address, message_to_sign, password) 68 return z 69 70 71 def sign_typed_message(self, address, domain, message, password=None): 72 message_to_sign = to_typed_message(message, domain, digest=True) 73 z = self.sign_pure(address, message_to_sign, password) 74 return z 75 76 77 # TODO: generic sign should be moved to non-eth context 78 def sign_pure(self, address, message, password=None): 79 pk = coincurve.PrivateKey(secret=self.keyGetter.get(address, password)) 80 z = pk.sign_recoverable(hasher=None, message=message) 81 return z 82 83 84 def sign_message(self, address, message, password=None, dialect='eth'): 85 if dialect == None: 86 return self.sign_pure(address, message, password=password) 87 elif dialect == 'eth': 88 return self.sign_ethereum_message(address, message, password=password) 89 raise ValueError('Unknown message sign dialect "{}"'.format(dialect)) 90 91