chaind-eth

Queue server for ethereum
Log | Files | Refs | README | LICENSE

commit 7b3cefc2add0e5e8b5e7cf7b13a4769c4cb5ae5b
parent 6a2061553fc3df1c344fe49b01f544c805e70a64
Author: nolash <dev@holbrook.no>
Date:   Tue,  7 Sep 2021 16:46:53 +0200

Add tx processor and erc20/gas token tx muxer

Diffstat:
Mchaind_eth/cli/csv.py | 2+-
Mchaind_eth/cli/process.py | 118++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
Achaind_eth/cli/resolver.py | 36++++++++++++++++++++++++++++++++++++
Mchaind_eth/runnable/send.py | 54+++++++++++++++++++++++++++++++++++++++++++++++++-----
4 files changed, 197 insertions(+), 13 deletions(-)

diff --git a/chaind_eth/cli/csv.py b/chaind_eth/cli/csv.py @@ -6,7 +6,7 @@ logg = logging.getLogger(__name__) class CSVProcessor: - def process(self, s): + def load(self, s): contents = [] f = None try: diff --git a/chaind_eth/cli/process.py b/chaind_eth/cli/process.py @@ -1,26 +1,130 @@ +# standard imports +import logging + # external imports from chaind.error import TxSourceError +from chainlib.eth.address import is_checksum_address +from chainlib.eth.tx import unpack +from chainlib.eth.gas import Gas +from hexathon import ( + add_0x, + strip_0x, + ) +from crypto_dev_signer.eth.transaction import EIP155Transaction +from eth_erc20 import ERC20 + +logg = logging.getLogger(__name__) class Processor: - def __init__(self, source): + def __init__(self, sender, signer, source, chain_spec, gas_oracle, nonce_oracle, resolver=None): + self.sender = sender + self.signer = signer self.source = source self.processor = [] self.content = [] + self.token_resolver = resolver + self.cursor = 0 + self.gas_oracle = gas_oracle + self.nonce_oracle = nonce_oracle + self.nonce_start = None + self.gas_limit_start = None + self.gas_price_start = None + self.chain_spec = chain_spec + self.chain_id = chain_spec.chain_id() def add_processor(self, processor): self.processor.append(processor) - def process(self): + def load(self, process=True): for processor in self.processor: - r = processor.process(self.source) - if r != None: - return r - raise TxSourceError() - + self.content = processor.load(self.source) + if self.content != None: + if process: + #try: + self.process() + #except Exception as e: + # raise TxSourceError('invalid source contents: {}'.format(str(e))) + return self.content + raise TxSourceError('unparseable source') + + + # 0: recipient + # 1: amount + # 2: token identifier + def process(self): + txs = [] + for i, r in enumerate(self.content): + logg.debug('processing {}'.format(r)) + if not is_checksum_address(r[0]): + raise ValueError('invalid checksum address {} in record {}'.format(r[0], i)) + self.content[i][0] = add_0x(r[0]) + try: + self.content[i][1] = int(r[1]) + except ValueError: + self.content[i][1] = int(strip_0x(r[1]), 16) + native_token_value = 0 + if self.token_resolver == None: + self.content[i][2] = None + else: + k = r[2] + self.content[i][2] = self.token_resolver.lookup(k) + + if len(self.content[i]) == 3: + self.content[i].append(native_token_value) + + + def __iter__(self): + gas_data = self.gas_oracle.get_gas() + self.gas_price_start = gas_data[0] + self.gas_limit_start = gas_data[1] + self.cursor = 0 + return self + + + def __next__(self): + if self.cursor == len(self.content): + raise StopIteration() + + nonce = self.nonce_oracle.next_nonce() + + token_factory = None + + r = self.content[self.cursor] + logg.debug('rrrr {} '.format(r)) + if r[2] == None: + token_factory = Gas(self.chain_spec, signer=self.signer, gas_oracle=self.gas_oracle, nonce_oracle=self.nonce_oracle) + else: + token_factory = ERC20(self.chain_spec, signer=self.signer, gas_oracle=self.gas_oracle, nonce_oracle=self.nonce_oracle) + + value = 0 + data = '0x' + if isinstance(token_factory, ERC20): + (tx_hash_hex, o) = token_factory.transfer(r[2], self.sender, r[0], r[1]) + logg.debug('tx {}'.format(o)) + # TODO: allow chainlib to return args only + tx = unpack(bytes.fromhex(strip_0x(o['params'][0])), self.chain_spec) + data = tx['data'] + else: + value = r[1] + + tx = { + 'from': self.sender, + 'to': r[0], + 'value': value, + 'data': data, + 'nonce': nonce, + 'gasPrice': self.gas_price_start, + 'gas': self.gas_limit_start, + } + tx_o = EIP155Transaction(tx, nonce, self.chain_id) + tx_bytes = self.signer.sign_transaction_to_wire(tx_o) + self.cursor += 1 + return tx_bytes + def __str__(self): names = [] diff --git a/chaind_eth/cli/resolver.py b/chaind_eth/cli/resolver.py @@ -0,0 +1,36 @@ +# standard imports +import logging + +# external imports +from chainlib.eth.constant import ZERO_ADDRESS + +logg = logging.getLogger(__name__) + + +class DefaultResolver: + + def __init__(self, chain_spec, rpc, sender_address=ZERO_ADDRESS): + self.chain_spec = chain_spec + self.rpc = rpc + self.lookups = [] + self.lookup_pointers = [] + self.cursor = 0 + self.sender_address = sender_address + + + def add_lookup(self, lookup, address): + self.lookups.append(lookup) + self.lookup_pointers.append(address) + + + def lookup(self, k): + for i, lookup in enumerate(self.lookups): + address = self.lookup_pointers[i] + o = lookup.address_of(address, k, sender_address=self.sender_address) + r = self.rpc.do(o) + address = lookup.parse_address_of(r) + if address == ZERO_ADDRESS: + address = None + return address + + raise FileNotFoundError(k) diff --git a/chaind_eth/runnable/send.py b/chaind_eth/runnable/send.py @@ -10,11 +10,16 @@ import stat # external imports import chainlib.eth.cli from chaind import Environment +from chainlib.eth.gas import price +from chainlib.chain import ChainSpec +from hexathon import strip_0x +from eth_token_index.index import TokenUniqueSymbolIndex # local imports from chaind_eth.cli.process import Processor from chaind_eth.cli.csv import CSVProcessor from chaind.error import TxSourceError +from chaind_eth.cli.resolver import DefaultResolver logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -26,19 +31,27 @@ config_dir = os.path.join(script_dir, '..', 'data', 'config') arg_flags = chainlib.eth.cli.argflag_std_write argparser = chainlib.eth.cli.ArgumentParser(arg_flags) argparser.add_argument('--socket', dest='socket', type=str, help='Socket to send transactions to') +argparser.add_argument('--token-index', dest='token_index', type=str, help='Token resolver index') argparser.add_positional('source', required=False, type=str, help='Transaction source file') args = argparser.parse_args() - - extra_args = { 'socket': None, 'source': None, + 'token_index': None, } env = Environment(domain='eth', env=os.environ) config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir) +wallet = chainlib.eth.cli.Wallet() +wallet.from_config(config) + +rpc = chainlib.eth.cli.Rpc(wallet=wallet) +conn = rpc.connect_by_config(config) + +chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) + class OpMode(enum.Enum): STDOUT = 'standard output' UNIX = 'unix socket' @@ -67,14 +80,45 @@ if config.get('_SOURCE') == None: sys.exit(1) +class TokenIndexLookupAdapter(TokenUniqueSymbolIndex): + + def __init__(self, sender, address, chain_spec, signer=None, gas_oracle=None, nonce_oracle=None): + super(TokenIndexLookupAdapter, self).__init__(chain_spec, signer=signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle) + self.index_address = address + self.sender = sender + + + def resolve(self, v): + return self.address_of(self.index_address, v, sender_address=sender) + + def main(): - processor = Processor(config.get('_SOURCE')) + signer = rpc.get_signer() + + token_resolver = DefaultResolver(chain_spec, conn, sender_address=rpc.get_sender_address()) + token_index_lookup = TokenUniqueSymbolIndex(chain_spec, signer=signer, gas_oracle=rpc.get_gas_oracle(), nonce_oracle=rpc.get_nonce_oracle()) + token_resolver.add_lookup(token_index_lookup, config.get('_TOKEN_INDEX')) + + processor = Processor(wallet.get_signer_address(), wallet.get_signer(), config.get('_SOURCE'), chain_spec, rpc.get_gas_oracle(), rpc.get_nonce_oracle(), resolver=token_resolver) processor.add_processor(CSVProcessor()) + + sends = None try: - r = processor.process() + sends = processor.load() except TxSourceError as e: - sys.stderr.write('source still unknown after trying processors: {}\n'.format(str(processor))) + sys.stderr.write('processing error: {}. processors: {}\n'.format(str(e), str(processor))) sys.exit(1) + + tx_iter = iter(processor) + while True: + tx = None + try: + tx_bytes = next(tx_iter) + except StopIteration: + break + print(tx_bytes.hex()) + + if __name__ == '__main__': main()