eth-monitor

Monitor and cache ethereum transactions with match filters
git clone git://git.defalsify.org/eth-monitor.git
Info | Log | Files | Refs | README | LICENSE

sync_thread_range.py (5125B)


      1 # standard imports
      2 import sys
      3 import signal
      4 import argparse
      5 import confini
      6 import logging
      7 import os
      8 
      9 # external imports
     10 from chainlib.chain import ChainSpec
     11 from chainlib.eth.connection import EthHTTPConnection
     12 from chainlib.eth.block import block_latest
     13 from hexathon import (
     14         strip_0x,
     15         add_0x,
     16         )
     17 from chainsyncer.driver.head import HeadSyncer
     18 #from chainsyncer.driver.history import HistorySyncer
     19 from chainsyncer.driver.threadrange import ThreadPoolRangeHistorySyncer
     20 from chainsyncer.backend.file import FileBackend
     21 from chainsyncer.filter import NoopFilter
     22 
     23 # local imports
     24 from eth_monitor.chain import EthChainInterface
     25 from eth_monitor.filters.cache import CacheFilter
     26 
     27 logging.basicConfig(level=logging.WARNING)
     28 logg = logging.getLogger()
     29 #logging.getLogger('leveldir.hex').setLevel(level=logging.DEBUG)
     30 #logging.getLogger('leveldir.numeric').setLevel(level=logging.DEBUG)
     31 
     32 default_eth_provider = os.environ.get('RPC_PROVIDER')
     33 if default_eth_provider == None:
     34     default_eth_provider = os.environ.get('ETH_PROVIDER', 'http://localhost:8545')
     35 
     36 script_dir = os.path.realpath(os.path.dirname(__file__))
     37 exec_dir = os.path.realpath(os.getcwd())
     38 default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(exec_dir, 'config'))
     39 
     40 argparser = argparse.ArgumentParser('master eth events monitor')
     41 argparser.add_argument('-p', '--provider', dest='p', default=default_eth_provider, type=str, help='Web3 provider url (http only)')
     42 argparser.add_argument('-c', type=str, default=default_config_dir, help='config file')
     43 argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string')
     44 argparser.add_argument('--offset', type=int, default=0, help='Use sequential rpc ids')
     45 argparser.add_argument('--seq', action='store_true', help='Use sequential rpc ids')
     46 argparser.add_argument('--skip-history', action='store_true', dest='skip_history', help='Skip history sync')
     47 argparser.add_argument('-v', action='store_true', help='Be verbose')
     48 argparser.add_argument('-vv', action='store_true', help='Be more verbose')
     49 args = argparser.parse_args(sys.argv[1:])
     50 
     51 if args.vv:
     52     logg.setLevel(logging.DEBUG)
     53 elif args.v:
     54     logg.setLevel(logging.INFO)
     55 
     56 config_dir = args.c
     57 config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
     58 config.process()
     59 args_override = {
     60         'CHAIN_SPEC': getattr(args, 'i'),
     61         }
     62 config.dict_override(args_override, 'cli')
     63 config.add(args.offset, '_SYNC_OFFSET', True)
     64 config.add(args.skip_history, '_NO_HISTORY', True)
     65 logg.debug('config loaded:\n{}'.format(config))
     66 
     67 chain_spec = ChainSpec.from_chain_str(args.i)
     68 
     69 state_dir = os.path.join(exec_dir, 'state')
     70 
     71 rpc_id_generator = None
     72 if args.seq:
     73     rpc_id_generator = IntSequenceGenerator()
     74 
     75 auth = None
     76 if os.environ.get('RPC_AUTHENTICATION') == 'basic':
     77     from chainlib.auth import BasicAuth
     78     auth = BasicAuth(os.environ['RPC_USERNAME'], os.environ['RPC_PASSWORD'])
     79 rpc = EthHTTPConnection(args.p)
     80 
     81 if __name__ == '__main__':
     82     o = block_latest()
     83     r = rpc.do(o)
     84     block_offset = int(strip_0x(r), 16) + 1
     85     logg.debug('current block height {}'.format(block_offset))
     86     syncers = []
     87 
     88     syncer_backends = FileBackend.resume(chain_spec, block_offset, base_dir=state_dir)
     89 
     90     import tempfile
     91     tmp_dir = tempfile.mkdtemp()
     92     logg.info('using dir {}'.format(tmp_dir))
     93     cache_filter = CacheFilter(chain_spec, tmp_dir)
     94     filters = [
     95             cache_filter, 
     96             ]
     97 
     98     if len(syncer_backends) == 0:
     99         initial_block_start = block_offset - 1
    100         if config.get('_SYNC_OFFSET') != None:
    101             initial_block_start = config.get('_SYNC_OFFSET')
    102         initial_block_offset = block_offset
    103         if config.get('_NO_HISTORY'):
    104             initial_block_start = block_offset
    105             initial_block_offset += 1
    106         syncer_backends.append(FileBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start, base_dir=state_dir))
    107         logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset))
    108     else:
    109         for syncer_backend in syncer_backends:
    110             logg.info('resuming sync session {}'.format(syncer_backend))
    111    
    112     chain_interface = EthChainInterface()
    113     for syncer_backend in syncer_backends:
    114         #syncers.append(HistorySyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback))
    115         syncers.append(ThreadPoolRangeHistorySyncer(8, syncer_backend, chain_interface, block_callback=cache_filter.block_callback))
    116 
    117     syncer_backend = FileBackend.live(chain_spec, block_offset+1, base_dir=state_dir)
    118     syncers.append(HeadSyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback))
    119    
    120     i = 0
    121     for syncer in syncers:
    122         logg.debug('running syncer index {} {}'.format(i, str(syncer)))
    123         for f in filters:
    124             syncer.add_filter(f)
    125 
    126         r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
    127         sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
    128 
    129         i += 1