syncer.py (5257B)
1 # standard imports 2 import sys 3 import time 4 import socket 5 import signal 6 import os 7 import logging 8 import stat 9 import argparse 10 import uuid 11 12 # external imports 13 import chainlib.eth.cli 14 from chaind import Environment 15 import confini 16 from hexathon import strip_0x 17 from chainlib.chain import ChainSpec 18 from chainlib.eth.connection import EthHTTPConnection 19 from chainlib.eth.block import block_latest 20 from chainsyncer.driver.head import HeadSyncer 21 from chainsyncer.driver.history import HistorySyncer 22 from chainsyncer.db import dsn_from_config 23 from chainsyncer.db.models.base import SessionBase 24 from chainsyncer.backend.sql import SQLBackend 25 from chainsyncer.error import SyncDone 26 27 # local imports 28 from chaind_eth.filter import StateFilter 29 from chaind_eth.chain import EthChainInterface 30 31 logging.basicConfig(level=logging.WARNING) 32 logg = logging.getLogger() 33 34 script_dir = os.path.dirname(os.path.realpath(__file__)) 35 config_dir = os.path.join(script_dir, '..', 'data', 'config') 36 37 env = Environment(domain='eth', env=os.environ) 38 39 arg_flags = chainlib.eth.cli.argflag_std_read 40 argparser = chainlib.eth.cli.ArgumentParser(arg_flags) 41 argparser.add_argument('--data-dir', type=str, help='data directory') 42 argparser.add_argument('--runtime-dir', type=str, help='runtime directory') 43 argparser.add_argument('--session-id', dest='session_id', type=str, help='session identifier') 44 argparser.add_argument('--offset', default=0, type=int, help='block height to sync history from') 45 args = argparser.parse_args() 46 extra_args = { 47 'runtime_dir': 'SESSION_RUNTIME_DIR', 48 'data_dir': 'SESSION_DATA_DIR', 49 'session_id': 'SESSION_ID', 50 'offset': 'SYNCER_HISTORY_START', 51 } 52 #config = chainlib.eth.cli.Config.from_args(args, arg_flags, default_config_dir=config_dir, extend_base_config_dir=config_dir) 53 config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=[config_dir, os.path.join(config_dir, 'syncer')]) 54 55 logg.debug('session id {} {}'.format(type(config.get('SESSION_ID')), config.get('SESSION_ID'))) 56 if config.get('SESSION_ID') == None: 57 config.add(env.session, 'SESSION_ID', exists_ok=True) 58 if config.get('SESSION_RUNTIME_DIR') == None: 59 config.add(env.runtime_dir, 'SESSION_RUNTIME_DIR', exists_ok=True) 60 if config.get('SESSION_DATA_DIR') == None: 61 config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True) 62 if not config.get('SESSION_SOCKET_PATH'): 63 socket_path = os.path.join(config.get('SESSION_RUNTIME_DIR'), config.get('SESSION_ID'), 'chaind.sock') 64 config.add(socket_path, 'SESSION_SOCKET_PATH', True) 65 66 if config.get('DATABASE_ENGINE') == 'sqlite': 67 config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) 68 69 config.censor('PASSWORD', 'DATABASE') 70 logg.debug('config loaded:\n{}'.format(config)) 71 72 73 chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) 74 75 dsn = dsn_from_config(config) 76 logg.debug('dns {}'.format(dsn)) 77 SQLBackend.setup(dsn, debug=config.true('DATABASE_DEBUG')) 78 rpc = EthHTTPConnection(url=config.get('RPC_PROVIDER'), chain_spec=chain_spec) 79 80 def register_filter_tags(filters, session): 81 for f in filters: 82 tag = f.tag() 83 try: 84 add_tag(session, tag[0], domain=tag[1]) 85 session.commit() 86 logg.info('added tag name "{}" domain "{}"'.format(tag[0], tag[1])) 87 except sqlalchemy.exc.IntegrityError: 88 session.rollback() 89 logg.debug('already have tag name "{}" domain "{}"'.format(tag[0], tag[1])) 90 91 92 def main(): 93 o = block_latest() 94 r = rpc.do(o) 95 block_offset = int(strip_0x(r), 16) + 1 96 97 syncers = [] 98 99 syncer_backends = SQLBackend.resume(chain_spec, block_offset) 100 101 if len(syncer_backends) == 0: 102 initial_block_start = config.get('SYNCER_HISTORY_START', 0) 103 if isinstance(initial_block_start, str): 104 initial_block_start = int(initial_block_start) 105 initial_block_offset = block_offset 106 if config.true('SYNCER_SKIP_HISTORY'): 107 initial_block_start = block_offset 108 initial_block_offset += 1 109 syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start)) 110 logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset)) 111 else: 112 for syncer_backend in syncer_backends: 113 logg.info('resuming sync session {}'.format(syncer_backend)) 114 115 chain_interface = EthChainInterface() 116 for syncer_backend in syncer_backends: 117 syncers.append(HistorySyncer(syncer_backend, chain_interface)) 118 119 syncer_backend = SQLBackend.live(chain_spec, block_offset+1) 120 syncers.append(HeadSyncer(syncer_backend, chain_interface)) 121 122 state_filter = StateFilter(chain_spec) 123 filters = [ 124 state_filter, 125 ] 126 127 i = 0 128 for syncer in syncers: 129 logg.debug('running syncer index {}'.format(i)) 130 for f in filters: 131 syncer.add_filter(f) 132 r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc) 133 sys.stderr.write("sync {} done at block {}\n".format(syncer, r)) 134 135 i += 1 136 137 sys.exit(0) 138 139 140 if __name__ == '__main__': 141 main()