chaind-eth

Queue server for ethereum
Log | Files | Refs | README | LICENSE

commit 9c325d416a3b8fa710dfede2e4fa5b741a3bf87f
parent 441cb0040471331154ef5cdb0ba00d1ea5086783
Author: lash <dev@holbrook.no>
Date:   Sun, 10 Apr 2022 09:33:10 +0000

WIP implement tasker (replacement for server.py)

Diffstat:
Achaind/eth/runnable/old/resend.py | 108+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Achaind/eth/runnable/old/retry.py | 113+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Achaind/eth/runnable/old/send.py | 152+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Achaind/eth/runnable/old/server.py | 215+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Achaind/eth/runnable/old/syncer.py | 141+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Achaind/eth/runnable/tasker.py | 128+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mrequirements.txt | 2++
7 files changed, 859 insertions(+), 0 deletions(-)

diff --git a/chaind/eth/runnable/old/resend.py b/chaind/eth/runnable/old/resend.py @@ -0,0 +1,108 @@ +# standard imports +import os +import logging +import sys +import datetime +import enum +import re +import stat + +# external imports +import chainlib.eth.cli +from chaind import Environment +from chainlib.eth.gas import price +from chainlib.chain import ChainSpec +from hexathon import strip_0x +from eth_token_index.index import TokenUniqueSymbolIndex + +# local imports +from chaind_eth.cli.retry import Retrier +from chaind.error import TxSourceError +from chaind_eth.cli.output import ( + Outputter, + OpMode, + ) + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +script_dir = os.path.dirname(os.path.realpath(__file__)) +config_dir = os.path.join(script_dir, '..', 'data', 'config') + + +arg_flags = chainlib.eth.cli.argflag_std_write +argparser = chainlib.eth.cli.ArgumentParser(arg_flags) +argparser.add_argument('--socket', dest='socket', type=str, help='Socket to send transactions to') +argparser.add_positional('source', required=False, type=str, help='Transaction source file') +args = argparser.parse_args() + +extra_args = { + 'socket': None, + 'source': None, + } + +env = Environment(domain='eth', env=os.environ) +config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir) + +wallet = chainlib.eth.cli.Wallet() +wallet.from_config(config) + +rpc = chainlib.eth.cli.Rpc(wallet=wallet) +conn = rpc.connect_by_config(config) + +chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) + + +mode = OpMode.STDOUT +re_unix = r'^ipc://(/.+)' +m = re.match(re_unix, config.get('_SOCKET', '')) +if m != None: + config.add(m.group(1), '_SOCKET', exists_ok=True) + r = 0 + try: + stat_info = os.stat(config.get('_SOCKET')) + if not stat.S_ISSOCK(stat_info.st_mode): + r = 1 + except FileNotFoundError: + r = 1 + + if r > 0: + sys.stderr.write('{} is not a socket\n'.format(config.get('_SOCKET'))) + sys.exit(1) + + mode = OpMode.UNIX + +logg.info('using mode {}'.format(mode.value)) + +if config.get('_SOURCE') == None: + sys.stderr.write('source data missing\n') + sys.exit(1) + + +def main(): + signer = rpc.get_signer() + + # TODO: make resolvers pluggable + processor = Retrier(wallet.get_signer_address(), wallet.get_signer(), config.get('_SOURCE'), chain_spec, rpc.get_gas_oracle()) + + sends = None + try: + sends = processor.load() + except TxSourceError as e: + sys.stderr.write('processing error: {}. processors: {}\n'.format(str(e), str(processor))) + sys.exit(1) + + tx_iter = iter(processor) + out = Outputter(mode) + while True: + tx = None + try: + tx_bytes = next(tx_iter) + except StopIteration: + break + tx_hex = tx_bytes.hex() + print(out.do(tx_hex, socket=config.get('_SOCKET'))) + + +if __name__ == '__main__': + main() diff --git a/chaind/eth/runnable/old/retry.py b/chaind/eth/runnable/old/retry.py @@ -0,0 +1,113 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +# standard imports +import os +import logging +import sys +import datetime + +# external imports +from hexathon import ( + add_0x, + strip_0x, + ) +from chaind import Environment +import chainlib.eth.cli +from chainlib.chain import ChainSpec +from chainqueue.db import dsn_from_config +from chainqueue.sql.backend import SQLBackend +from chainqueue.enum import StatusBits +from chaind.sql.session import SessionIndex +from chainqueue.adapters.eth import EthAdapter +from chainlib.eth.gas import price +from chainlib.eth.connection import EthHTTPConnection +from crypto_dev_signer.eth.transaction import EIP155Transaction + +DEFAULT_GAS_FACTOR = 1.1 + + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +script_dir = os.path.dirname(os.path.realpath(__file__)) +config_dir = os.path.join(script_dir, '..', 'data', 'config') + +arg_flags = chainlib.eth.cli.argflag_std_write +argparser = chainlib.eth.cli.ArgumentParser(arg_flags) +argparser.add_argument('--backend', type=str, default='sql', help='Backend to use (currently only "sql")') +argparser.add_positional('session_id', required=False, type=str, help='Ethereum address of recipient') +args = argparser.parse_args() +extra_args = { + 'backend': None, + 'session_id': 'SESSION_ID', + } + +env = Environment(domain='eth', env=os.environ) + +config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir) + +if config.get('SESSION_DATA_DIR') == None: + config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True) + +chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) + +tx_getter = None +session_method = None +if config.get('_BACKEND') == 'sql': + from chainqueue.sql.query import get_tx_cache as tx_getter + from chainqueue.runnable.sql import setup_backend + from chainqueue.db.models.base import SessionBase + setup_backend(config, debug=config.true('DATABASE_DEBUG')) + session_method = SessionBase.create_session +else: + raise NotImplementedError('backend {} not implemented'.format(config.get('_BACKEND'))) + +if config.get('DATABASE_ENGINE') == 'sqlite': + config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) + +wallet = chainlib.eth.cli.Wallet() +wallet.from_config(config) + +rpc = chainlib.eth.cli.Rpc(wallet=wallet) +conn = rpc.connect_by_config(config) + +dsn = dsn_from_config(config) +backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG'), error_parser=rpc.error_parser) +session_index_backend = SessionIndex(config.get('SESSION_ID')) +adapter = EthAdapter(backend, session_index_backend=session_index_backend) + + +def main(): + before = datetime.datetime.utcnow() - adapter.pending_retry_threshold + txs = session_index_backend.get(chain_spec, adapter, status=StatusBits.IN_NETWORK, not_status=StatusBits.FINAL | StatusBits.OBSOLETE, before=before) + + o = price() + r = conn.do(o, error_parser=rpc.error_parser) + gas_price = strip_0x(r) + try: + gas_price = int(gas_price, 16) + except ValueError: + gas_price = int(gas_price) + logg.info('got current gas price {}'.format(gas_price)) + + signer = rpc.get_signer() + + db_session = adapter.create_session() + for tx_hash in txs: + tx_bytes = bytes.fromhex(strip_0x(txs[tx_hash])) + tx = adapter.translate(tx_bytes, chain_spec) + tx_gas_price = int(tx['gasPrice']) + if tx_gas_price < gas_price: + tx['gasPrice'] = gas_price + else: + tx['gasPrice'] = int(tx['gasPrice'] * DEFAULT_GAS_FACTOR) + tx_obj = EIP155Transaction(tx, tx['nonce'], chain_spec.chain_id()) + new_tx_bytes = signer.sign_transaction_to_wire(tx_obj) + logg.debug('add tx {} with gas price changed from {} to {}: {}'.format(tx_hash, tx_gas_price, tx['gasPrice'], new_tx_bytes.hex())) + adapter.add(new_tx_bytes, chain_spec, session=db_session) + + db_session.close() + + +if __name__ == '__main__': + main() diff --git a/chaind/eth/runnable/old/send.py b/chaind/eth/runnable/old/send.py @@ -0,0 +1,152 @@ +# standard imports +import os +import logging +import sys +import datetime +import enum +import re +import stat +import socket + +# external imports +import chainlib.eth.cli +from chaind import Environment +from chainlib.eth.gas import price +from chainlib.chain import ChainSpec +from hexathon import strip_0x + +# local imports +from chaind_eth.cli.process import Processor +from chaind_eth.cli.csv import CSVProcessor +from chaind.error import TxSourceError +from chaind_eth.cli.resolver import ( + DefaultResolver, + LookNoop, + TokenIndexLookup, + ) + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +script_dir = os.path.dirname(os.path.realpath(__file__)) +config_dir = os.path.join(script_dir, '..', 'data', 'config') + + +arg_flags = chainlib.eth.cli.argflag_std_write +argparser = chainlib.eth.cli.ArgumentParser(arg_flags) +argparser.add_argument('--socket', dest='socket', type=str, help='Socket to send transactions to') +argparser.add_argument('--token-index', dest='token_index', type=str, help='Token resolver index') +argparser.add_positional('source', required=False, type=str, help='Transaction source file') +args = argparser.parse_args() + +extra_args = { + 'socket': None, + 'source': None, + 'token_index': None, + } + +env = Environment(domain='eth', env=os.environ) +config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir) + +wallet = chainlib.eth.cli.Wallet() +wallet.from_config(config) + +rpc = chainlib.eth.cli.Rpc(wallet=wallet) +conn = rpc.connect_by_config(config) + +chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) + +class OpMode(enum.Enum): + STDOUT = 'standard_output' + UNIX = 'unix_socket' +mode = OpMode.STDOUT + +re_unix = r'^ipc://(/.+)' +m = re.match(re_unix, config.get('_SOCKET', '')) +if m != None: + config.add(m.group(1), '_SOCKET', exists_ok=True) + r = 0 + try: + stat_info = os.stat(config.get('_SOCKET')) + if not stat.S_ISSOCK(stat_info.st_mode): + r = 1 + except FileNotFoundError: + r = 1 + + if r > 0: + sys.stderr.write('{} is not a socket\n'.format(config.get('_SOCKET'))) + sys.exit(1) + + mode = OpMode.UNIX + +logg.info('using mode {}'.format(mode.value)) + +if config.get('_SOURCE') == None: + sys.stderr.write('source data missing\n') + sys.exit(1) + + +class Outputter: + + def __init__(self, mode): + self.out = getattr(self, 'do_' + mode.value) + + + def do(self, hx): + return self.out(hx) + + + def do_standard_output(self, hx): + #sys.stdout.write(hx + '\n') + return hx + + + def do_unix_socket(self, hx): + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.connect(config.get('_SOCKET')) + s.send(hx.encode('utf-8')) + r = s.recv(64+4) + logg.debug('r {}'.format(r)) + s.close() + return r[4:].decode('utf-8') + + +def main(): + signer = rpc.get_signer() + + + # TODO: make resolvers pluggable + token_resolver = DefaultResolver(chain_spec, conn, sender_address=rpc.get_sender_address()) + + noop_lookup = LookNoop(check=not config.true('_UNSAFE')) + token_resolver.add_lookup(noop_lookup, 'noop') + + if config.get('_TOKEN_INDEX') != None: + token_index_lookup = TokenIndexLookup(chain_spec, signer, rpc.get_gas_oracle(), rpc.get_nonce_oracle(), config.get('_TOKEN_INDEX')) + token_resolver.add_lookup(token_index_lookup, reverse=config.get('_TOKEN_INDEX')) + + processor = Processor(wallet.get_signer_address(), wallet.get_signer(), config.get('_SOURCE'), chain_spec, rpc.get_gas_oracle(), rpc.get_nonce_oracle(), resolver=token_resolver) + processor.add_processor(CSVProcessor()) + + sends = None + try: + sends = processor.load() + except TxSourceError as e: + sys.stderr.write('processing error: {}. processors: {}\n'.format(str(e), str(processor))) + sys.exit(1) + + + tx_iter = iter(processor) + out = Outputter(mode) + while True: + tx = None + try: + tx_bytes = next(tx_iter) + except StopIteration: + break + tx_hex = tx_bytes.hex() + print(out.do(tx_hex)) + + +if __name__ == '__main__': + main() diff --git a/chaind/eth/runnable/old/server.py b/chaind/eth/runnable/old/server.py @@ -0,0 +1,215 @@ +# standard imports +import sys +import time +import socket +import signal +import os +import logging +import stat +import argparse + +# external imports +import chainlib.eth.cli +from chaind import Environment +from hexathon import strip_0x +from chainlib.chain import ChainSpec +from chainlib.eth.connection import EthHTTPConnection +from chainqueue.sql.backend import SQLBackend +from chainlib.error import JSONRPCException +from chainqueue.db import dsn_from_config +from chaind.sql.session import SessionIndex + +# local imports +from chaind_eth.dispatch import Dispatcher +from chainqueue.adapters.eth import EthAdapter + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +script_dir = os.path.dirname(os.path.realpath(__file__)) +config_dir = os.path.join(script_dir, '..', 'data', 'config') + +env = Environment(domain='eth', env=os.environ) + +arg_flags = chainlib.eth.cli.argflag_std_read +argparser = chainlib.eth.cli.ArgumentParser(arg_flags) +argparser.add_argument('--data-dir', type=str, help='data directory') +argparser.add_argument('--runtime-dir', type=str, help='runtime directory') +argparser.add_argument('--session-id', dest='session_id', type=str, help='session identifier') +argparser.add_argument('--dispatch-delay', dest='dispatch_delay', type=float, help='socket timeout before processing queue') +args = argparser.parse_args() +extra_args = { + 'runtime_dir': 'SESSION_RUNTIME_DIR', + 'data_dir': 'SESSION_DATA_DIR', + 'session_id': 'SESSION_ID', + 'dispatch_delay': 'SESSION_DISPATCH_DELAY', + } +#config = chainlib.eth.cli.Config.from_args(args, arg_flags, default_config_dir=config_dir, extend_base_config_dir=config_dir) +config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir) + +logg.debug('session id {} {}'.format(type(config.get('SESSION_ID')), config.get('SESSION_ID'))) +if config.get('SESSION_ID') == None: + config.add(env.session, 'SESSION_ID', exists_ok=True) +if config.get('SESSION_RUNTIME_DIR') == None: + config.add(env.runtime_dir, 'SESSION_RUNTIME_DIR', exists_ok=True) +if config.get('SESSION_DATA_DIR') == None: + config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True) +if not config.get('SESSION_SOCKET_PATH'): + socket_path = os.path.join(config.get('SESSION_RUNTIME_DIR'), config.get('SESSION_ID'), 'chaind.sock') + config.add(socket_path, 'SESSION_SOCKET_PATH', True) + +if config.get('DATABASE_ENGINE') == 'sqlite': + #config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) + config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) + +config.censor('PASSWORD', 'DATABASE') +logg.debug('config loaded:\n{}'.format(config)) + + +# verify setup +try: + os.stat(config.get('DATABASE_NAME')) +except FileNotFoundError: + sys.stderr.write('database file {} not found. please run database migration script first\n'.format(config.get('DATABASE_NAME'))) + sys.exit(1) + + +class SessionController: + + def __init__(self, config): + self.dead = False + os.makedirs(os.path.dirname(config.get('SESSION_SOCKET_PATH')), exist_ok=True) + try: + os.unlink(config.get('SESSION_SOCKET_PATH')) + except FileNotFoundError: + pass + + self.srv = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) + self.srv.bind(config.get('SESSION_SOCKET_PATH')) + self.srv.listen(2) + self.srv.settimeout(float(config.get('SESSION_DISPATCH_DELAY'))) + + + def shutdown(self, signo, frame): + if self.dead: + return + self.dead = True + if signo != None: + logg.info('closing on {}'.format(signo)) + else: + logg.info('explicit shutdown') + sockname = self.srv.getsockname() + self.srv.close() + try: + os.unlink(sockname) + except FileNotFoundError: + logg.warning('socket file {} already gone'.format(sockname)) + + + def get_connection(self): + return self.srv.accept() + + +ctrl = SessionController(config) + +signal.signal(signal.SIGINT, ctrl.shutdown) +signal.signal(signal.SIGTERM, ctrl.shutdown) + +chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) + +rpc = chainlib.eth.cli.Rpc() +conn = rpc.connect_by_config(config) + +dsn = dsn_from_config(config) +backend = SQLBackend(dsn, error_parser=rpc.error_parser, debug=config.true('DATABASE_DEBUG')) +session_index_backend = SessionIndex(config.get('SESSION_ID')) +adapter = EthAdapter(backend, session_index_backend=session_index_backend) + + +def process_outgoing(chain_spec, adapter, rpc, limit=100): + dispatcher = Dispatcher(chain_spec, adapter, limit=limit) + session = adapter.create_session() + r = dispatcher.process(rpc, session) + session.close() + return r + + +def main(): + while True: + srvs = None + try: + logg.debug('getting connection') + (srvs, srvs_addr) = ctrl.get_connection() + except OSError as e: + try: + fi = os.stat(config.get('SESSION_SOCKET_PATH')) + except FileNotFoundError: + logg.error('socket is gone') + break + if not stat.S_ISSOCK(fi.st_mode): + logg.error('entity on socket path is not a socket') + break + if srvs == None: + logg.debug('timeout (remote socket is none)') + r = process_outgoing(chain_spec, adapter, conn) + if r > 0: + ctrl.srv.settimeout(0.1) + else: + ctrl.srv.settimeout(4.0) + continue + ctrl.srv.settimeout(0.1) + srvs.settimeout(0.1) + data_in = None + try: + data_in = srvs.recv(1048576) + except BlockingIOError as e: + logg.debug('block io error: {}'.format(e)) + continue + + data = None + try: + data_in_str = data_in.decode('utf-8') + data_hex = strip_0x(data_in_str.rstrip()) + data = bytes.fromhex(data_hex) + except ValueError: + logg.error('invalid input "{}"'.format(data_in_str)) + continue + + logg.debug('recv {} bytes'.format(len(data))) + session = backend.create_session() + tx_hash = None + signed_tx = None + try: + tx_hash = adapter.add(data_hex, chain_spec, session=session) + except ValueError as e: + try: + signed_tx = adapter.get(data_hex, chain_spec, session=session) + except ValueError as e: + logg.error('invalid input: {}'.format(e)) + + if tx_hash != None: + session.commit() + try: + r = int(0).to_bytes(4, byteorder='big') + r += strip_0x(tx_hash).encode('utf-8') + srvs.send(r) + logg.debug('{} bytes sent'.format(r)) + except BrokenPipeError: + logg.debug('they just hung up. how rude.') + elif signed_tx != None: + r = int(0).to_bytes(4, byteorder='big') + r += strip_0x(signed_tx).encode('utf-8') + try: + r = srvs.send(r) + except BrokenPipeError: + logg.debug('they just hung up. how useless.') + else: + r = srvs.send(int(1).to_bytes(4, byteorder='big')) + + session.close() + srvs.close() + + ctrl.shutdown(None, None) + +if __name__ == '__main__': + main() diff --git a/chaind/eth/runnable/old/syncer.py b/chaind/eth/runnable/old/syncer.py @@ -0,0 +1,141 @@ +# standard imports +import sys +import time +import socket +import signal +import os +import logging +import stat +import argparse +import uuid + +# external imports +import chainlib.eth.cli +from chaind import Environment +import confini +from hexathon import strip_0x +from chainlib.chain import ChainSpec +from chainlib.eth.connection import EthHTTPConnection +from chainlib.eth.block import block_latest +from chainsyncer.driver.head import HeadSyncer +from chainsyncer.driver.history import HistorySyncer +from chainsyncer.db import dsn_from_config +from chainsyncer.db.models.base import SessionBase +from chainsyncer.backend.sql import SQLBackend +from chainsyncer.error import SyncDone + +# local imports +from chaind_eth.filter import StateFilter +from chaind_eth.chain import EthChainInterface + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +script_dir = os.path.dirname(os.path.realpath(__file__)) +config_dir = os.path.join(script_dir, '..', 'data', 'config') + +env = Environment(domain='eth', env=os.environ) + +arg_flags = chainlib.eth.cli.argflag_std_read +argparser = chainlib.eth.cli.ArgumentParser(arg_flags) +argparser.add_argument('--data-dir', type=str, help='data directory') +argparser.add_argument('--runtime-dir', type=str, help='runtime directory') +argparser.add_argument('--session-id', dest='session_id', type=str, help='session identifier') +argparser.add_argument('--offset', default=0, type=int, help='block height to sync history from') +args = argparser.parse_args() +extra_args = { + 'runtime_dir': 'SESSION_RUNTIME_DIR', + 'data_dir': 'SESSION_DATA_DIR', + 'session_id': 'SESSION_ID', + 'offset': 'SYNCER_HISTORY_START', + } +#config = chainlib.eth.cli.Config.from_args(args, arg_flags, default_config_dir=config_dir, extend_base_config_dir=config_dir) +config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=[config_dir, os.path.join(config_dir, 'syncer')]) + +logg.debug('session id {} {}'.format(type(config.get('SESSION_ID')), config.get('SESSION_ID'))) +if config.get('SESSION_ID') == None: + config.add(env.session, 'SESSION_ID', exists_ok=True) +if config.get('SESSION_RUNTIME_DIR') == None: + config.add(env.runtime_dir, 'SESSION_RUNTIME_DIR', exists_ok=True) +if config.get('SESSION_DATA_DIR') == None: + config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True) +if not config.get('SESSION_SOCKET_PATH'): + socket_path = os.path.join(config.get('SESSION_RUNTIME_DIR'), config.get('SESSION_ID'), 'chaind.sock') + config.add(socket_path, 'SESSION_SOCKET_PATH', True) + +if config.get('DATABASE_ENGINE') == 'sqlite': + config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) + +config.censor('PASSWORD', 'DATABASE') +logg.debug('config loaded:\n{}'.format(config)) + + +chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) + +dsn = dsn_from_config(config) +logg.debug('dns {}'.format(dsn)) +SQLBackend.setup(dsn, debug=config.true('DATABASE_DEBUG')) +rpc = EthHTTPConnection(url=config.get('RPC_PROVIDER'), chain_spec=chain_spec) + +def register_filter_tags(filters, session): + for f in filters: + tag = f.tag() + try: + add_tag(session, tag[0], domain=tag[1]) + session.commit() + logg.info('added tag name "{}" domain "{}"'.format(tag[0], tag[1])) + except sqlalchemy.exc.IntegrityError: + session.rollback() + logg.debug('already have tag name "{}" domain "{}"'.format(tag[0], tag[1])) + + +def main(): + o = block_latest() + r = rpc.do(o) + block_offset = int(strip_0x(r), 16) + 1 + + syncers = [] + + syncer_backends = SQLBackend.resume(chain_spec, block_offset) + + if len(syncer_backends) == 0: + initial_block_start = config.get('SYNCER_HISTORY_START', 0) + if isinstance(initial_block_start, str): + initial_block_start = int(initial_block_start) + initial_block_offset = block_offset + if config.true('SYNCER_SKIP_HISTORY'): + initial_block_start = block_offset + initial_block_offset += 1 + syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start)) + logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset)) + else: + for syncer_backend in syncer_backends: + logg.info('resuming sync session {}'.format(syncer_backend)) + + chain_interface = EthChainInterface() + for syncer_backend in syncer_backends: + syncers.append(HistorySyncer(syncer_backend, chain_interface)) + + syncer_backend = SQLBackend.live(chain_spec, block_offset+1) + syncers.append(HeadSyncer(syncer_backend, chain_interface)) + + state_filter = StateFilter(chain_spec) + filters = [ + state_filter, + ] + + i = 0 + for syncer in syncers: + logg.debug('running syncer index {}'.format(i)) + for f in filters: + syncer.add_filter(f) + r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc) + sys.stderr.write("sync {} done at block {}\n".format(syncer, r)) + + i += 1 + + sys.exit(0) + + +if __name__ == '__main__': + main() diff --git a/chaind/eth/runnable/tasker.py b/chaind/eth/runnable/tasker.py @@ -0,0 +1,128 @@ +# standard imports +import os +import logging +import signal + +# external imports +import chainlib.eth.cli +from chaind.session import SessionController +from chaind.setup import Environment +from chaind.error import ( + NothingToDoError, + ClientGoneError, + ClientBlockError, + ClientInputError, + ) +from chainqueue import ( + Store, + Status, + ) +from chainqueue.store.fs import ( + IndexStore, + CounterStore, + ) +from chainqueue.cache import CacheTokenTx +from chainlib.encode import TxHexNormalizer +from chainlib.chain import ChainSpec +from chaind.adapters.fs import ChaindFsAdapter + +# local imports +from chaind.eth.dispatch import EthDispatcher +from chaind.eth.cache import EthCacheTx + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +script_dir = os.path.dirname(os.path.realpath(__file__)) +config_dir = os.path.join(script_dir, '..', 'data', 'config') + +env = Environment(domain='eth', env=os.environ) + +arg_flags = chainlib.eth.cli.argflag_std_read +argparser = chainlib.eth.cli.ArgumentParser(arg_flags) +argparser.add_argument('--data-dir', type=str, help='data directory') +argparser.add_argument('--runtime-dir', type=str, help='runtime directory') +argparser.add_argument('--session-id', dest='session_id', type=str, help='session identifier') +argparser.add_argument('--dispatch-delay', dest='dispatch_delay', type=float, help='socket timeout before processing queue') +args = argparser.parse_args() +extra_args = { + 'runtime_dir': 'SESSION_RUNTIME_DIR', + 'data_dir': 'SESSION_DATA_DIR', + 'session_id': 'SESSION_ID', + 'dispatch_delay': 'SESSION_DISPATCH_DELAY', + } +#config = chainlib.eth.cli.Config.from_args(args, arg_flags, default_config_dir=config_dir, extend_base_config_dir=config_dir) +config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir) + +logg.debug('session id {} {}'.format(type(config.get('SESSION_ID')), config.get('SESSION_ID'))) +if config.get('SESSION_ID') == None: + config.add(env.session, 'SESSION_ID', exists_ok=True) +if config.get('SESSION_RUNTIME_DIR') == None: + config.add(env.runtime_dir, 'SESSION_RUNTIME_DIR', exists_ok=True) +if config.get('SESSION_DATA_DIR') == None: + config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True) +if not config.get('SESSION_SOCKET_PATH'): + socket_path = os.path.join(config.get('SESSION_RUNTIME_DIR'), config.get('SESSION_ID'), 'chaind.sock') + config.add(socket_path, 'SESSION_SOCKET_PATH', True) + +if config.get('DATABASE_ENGINE') == 'sqlite': + #config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) + config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) + +config.censor('PASSWORD', 'DATABASE') +logg.debug('config loaded:\n{}'.format(config)) + +def process_outgoing(chain_spec, adapter, rpc, limit=100): + upcoming = adapter.upcoming() + logg.info('process {} {} {}'.format(chain_spec, adapter, rpc)) + logg.info('upcoming {}'.format(upcoming)) + i = 0 + for tx_hash in upcoming: + if adapter.dispatch(tx_hash): + i += 1 + return i + +chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) + +rpc = chainlib.eth.cli.Rpc() +conn = rpc.connect_by_config(config) + +tx_normalizer = TxHexNormalizer().tx_hash +token_cache_store = CacheTokenTx(chain_spec, normalizer=tx_normalizer) +dispatcher = EthDispatcher(conn) +queue_adapter = ChaindFsAdapter( + chain_spec, + config.get('SESSION_DATA_DIR'), + EthCacheTx, + dispatcher, + ) + +ctrl = SessionController(config, queue_adapter, process_outgoing) +signal.signal(signal.SIGINT, ctrl.shutdown) +signal.signal(signal.SIGTERM, ctrl.shutdown) + + +def main(): + while True: + r = None + try: + r = ctrl.get() + except ClientGoneError: + break + except ClientBlockError: + continue + except ClientInputError: + continue + except NothingToDoError: + pass + + if r == None: + ctrl.process(conn) + continue + + tx_hash = queue_adapter.put(r.hex()) + queue_adapter.enqueue(tx_hash) + + +if __name__ == '__main__': + main() diff --git a/requirements.txt b/requirements.txt @@ -1,4 +1,6 @@ #chaind~=0.1.0 hexathon~=0.1.5 chainlib-eth>=0.1.0b3,<=0.1.0 +pyxdg~=0.27 #eth-token-index~=0.2.4 +shep~=0.1.1