sync_thread_range.py (5125B)
1 # standard imports 2 import sys 3 import signal 4 import argparse 5 import confini 6 import logging 7 import os 8 9 # external imports 10 from chainlib.chain import ChainSpec 11 from chainlib.eth.connection import EthHTTPConnection 12 from chainlib.eth.block import block_latest 13 from hexathon import ( 14 strip_0x, 15 add_0x, 16 ) 17 from chainsyncer.driver.head import HeadSyncer 18 #from chainsyncer.driver.history import HistorySyncer 19 from chainsyncer.driver.threadrange import ThreadPoolRangeHistorySyncer 20 from chainsyncer.backend.file import FileBackend 21 from chainsyncer.filter import NoopFilter 22 23 # local imports 24 from eth_monitor.chain import EthChainInterface 25 from eth_monitor.filters.cache import CacheFilter 26 27 logging.basicConfig(level=logging.WARNING) 28 logg = logging.getLogger() 29 #logging.getLogger('leveldir.hex').setLevel(level=logging.DEBUG) 30 #logging.getLogger('leveldir.numeric').setLevel(level=logging.DEBUG) 31 32 default_eth_provider = os.environ.get('RPC_PROVIDER') 33 if default_eth_provider == None: 34 default_eth_provider = os.environ.get('ETH_PROVIDER', 'http://localhost:8545') 35 36 script_dir = os.path.realpath(os.path.dirname(__file__)) 37 exec_dir = os.path.realpath(os.getcwd()) 38 default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(exec_dir, 'config')) 39 40 argparser = argparse.ArgumentParser('master eth events monitor') 41 argparser.add_argument('-p', '--provider', dest='p', default=default_eth_provider, type=str, help='Web3 provider url (http only)') 42 argparser.add_argument('-c', type=str, default=default_config_dir, help='config file') 43 argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string') 44 argparser.add_argument('--offset', type=int, default=0, help='Use sequential rpc ids') 45 argparser.add_argument('--seq', action='store_true', help='Use sequential rpc ids') 46 argparser.add_argument('--skip-history', action='store_true', dest='skip_history', help='Skip history sync') 47 argparser.add_argument('-v', action='store_true', help='Be verbose') 48 argparser.add_argument('-vv', action='store_true', help='Be more verbose') 49 args = argparser.parse_args(sys.argv[1:]) 50 51 if args.vv: 52 logg.setLevel(logging.DEBUG) 53 elif args.v: 54 logg.setLevel(logging.INFO) 55 56 config_dir = args.c 57 config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX')) 58 config.process() 59 args_override = { 60 'CHAIN_SPEC': getattr(args, 'i'), 61 } 62 config.dict_override(args_override, 'cli') 63 config.add(args.offset, '_SYNC_OFFSET', True) 64 config.add(args.skip_history, '_NO_HISTORY', True) 65 logg.debug('config loaded:\n{}'.format(config)) 66 67 chain_spec = ChainSpec.from_chain_str(args.i) 68 69 state_dir = os.path.join(exec_dir, 'state') 70 71 rpc_id_generator = None 72 if args.seq: 73 rpc_id_generator = IntSequenceGenerator() 74 75 auth = None 76 if os.environ.get('RPC_AUTHENTICATION') == 'basic': 77 from chainlib.auth import BasicAuth 78 auth = BasicAuth(os.environ['RPC_USERNAME'], os.environ['RPC_PASSWORD']) 79 rpc = EthHTTPConnection(args.p) 80 81 if __name__ == '__main__': 82 o = block_latest() 83 r = rpc.do(o) 84 block_offset = int(strip_0x(r), 16) + 1 85 logg.debug('current block height {}'.format(block_offset)) 86 syncers = [] 87 88 syncer_backends = FileBackend.resume(chain_spec, block_offset, base_dir=state_dir) 89 90 import tempfile 91 tmp_dir = tempfile.mkdtemp() 92 logg.info('using dir {}'.format(tmp_dir)) 93 cache_filter = CacheFilter(chain_spec, tmp_dir) 94 filters = [ 95 cache_filter, 96 ] 97 98 if len(syncer_backends) == 0: 99 initial_block_start = block_offset - 1 100 if config.get('_SYNC_OFFSET') != None: 101 initial_block_start = config.get('_SYNC_OFFSET') 102 initial_block_offset = block_offset 103 if config.get('_NO_HISTORY'): 104 initial_block_start = block_offset 105 initial_block_offset += 1 106 syncer_backends.append(FileBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start, base_dir=state_dir)) 107 logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset)) 108 else: 109 for syncer_backend in syncer_backends: 110 logg.info('resuming sync session {}'.format(syncer_backend)) 111 112 chain_interface = EthChainInterface() 113 for syncer_backend in syncer_backends: 114 #syncers.append(HistorySyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback)) 115 syncers.append(ThreadPoolRangeHistorySyncer(8, syncer_backend, chain_interface, block_callback=cache_filter.block_callback)) 116 117 syncer_backend = FileBackend.live(chain_spec, block_offset+1, base_dir=state_dir) 118 syncers.append(HeadSyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback)) 119 120 i = 0 121 for syncer in syncers: 122 logg.debug('running syncer index {} {}'.format(i, str(syncer))) 123 for f in filters: 124 syncer.add_filter(f) 125 126 r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc) 127 sys.stderr.write("sync {} done at block {}\n".format(syncer, r)) 128 129 i += 1