chaind-eth

Queue server for ethereum
Info | Log | Files | Refs | README | LICENSE

syncer.py (5257B)


      1 # standard imports
      2 import sys
      3 import time
      4 import socket
      5 import signal
      6 import os
      7 import logging
      8 import stat
      9 import argparse
     10 import uuid
     11 
     12 # external imports
     13 import chainlib.eth.cli
     14 from chaind import Environment
     15 import confini
     16 from hexathon import strip_0x
     17 from chainlib.chain import ChainSpec
     18 from chainlib.eth.connection import EthHTTPConnection
     19 from chainlib.eth.block import block_latest
     20 from chainsyncer.driver.head import HeadSyncer
     21 from chainsyncer.driver.history import HistorySyncer
     22 from chainsyncer.db import dsn_from_config
     23 from chainsyncer.db.models.base import SessionBase
     24 from chainsyncer.backend.sql import SQLBackend
     25 from chainsyncer.error import SyncDone
     26 
     27 # local imports
     28 from chaind_eth.filter import StateFilter
     29 from chaind_eth.chain import EthChainInterface
     30 
     31 logging.basicConfig(level=logging.WARNING)
     32 logg = logging.getLogger()
     33 
     34 script_dir = os.path.dirname(os.path.realpath(__file__))
     35 config_dir = os.path.join(script_dir, '..', 'data', 'config')
     36 
     37 env = Environment(domain='eth', env=os.environ)
     38 
     39 arg_flags = chainlib.eth.cli.argflag_std_read
     40 argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
     41 argparser.add_argument('--data-dir', type=str, help='data directory')
     42 argparser.add_argument('--runtime-dir', type=str, help='runtime directory')
     43 argparser.add_argument('--session-id', dest='session_id', type=str, help='session identifier')
     44 argparser.add_argument('--offset', default=0, type=int, help='block height to sync history from')
     45 args = argparser.parse_args()
     46 extra_args = {
     47     'runtime_dir': 'SESSION_RUNTIME_DIR',
     48     'data_dir': 'SESSION_DATA_DIR',
     49     'session_id': 'SESSION_ID', 
     50     'offset': 'SYNCER_HISTORY_START',
     51         }
     52 #config = chainlib.eth.cli.Config.from_args(args, arg_flags, default_config_dir=config_dir, extend_base_config_dir=config_dir)
     53 config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=[config_dir, os.path.join(config_dir, 'syncer')])
     54 
     55 logg.debug('session id {} {}'.format(type(config.get('SESSION_ID')), config.get('SESSION_ID')))
     56 if config.get('SESSION_ID') == None:
     57     config.add(env.session, 'SESSION_ID', exists_ok=True)
     58 if config.get('SESSION_RUNTIME_DIR') == None:
     59     config.add(env.runtime_dir, 'SESSION_RUNTIME_DIR', exists_ok=True)
     60 if config.get('SESSION_DATA_DIR') == None:
     61     config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True)
     62 if not config.get('SESSION_SOCKET_PATH'):
     63     socket_path = os.path.join(config.get('SESSION_RUNTIME_DIR'), config.get('SESSION_ID'), 'chaind.sock')
     64     config.add(socket_path, 'SESSION_SOCKET_PATH', True)
     65 
     66 if config.get('DATABASE_ENGINE') == 'sqlite':
     67     config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True)
     68     
     69 config.censor('PASSWORD', 'DATABASE')
     70 logg.debug('config loaded:\n{}'.format(config))
     71 
     72 
     73 chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
     74 
     75 dsn = dsn_from_config(config)
     76 logg.debug('dns {}'.format(dsn))
     77 SQLBackend.setup(dsn, debug=config.true('DATABASE_DEBUG'))
     78 rpc = EthHTTPConnection(url=config.get('RPC_PROVIDER'), chain_spec=chain_spec)
     79 
     80 def register_filter_tags(filters, session):
     81     for f in filters:
     82         tag = f.tag()
     83         try:
     84             add_tag(session, tag[0], domain=tag[1])
     85             session.commit()
     86             logg.info('added tag name "{}" domain "{}"'.format(tag[0], tag[1]))
     87         except sqlalchemy.exc.IntegrityError:
     88             session.rollback()
     89             logg.debug('already have tag name "{}" domain "{}"'.format(tag[0], tag[1]))
     90 
     91 
     92 def main():
     93     o = block_latest()
     94     r = rpc.do(o)
     95     block_offset = int(strip_0x(r), 16) + 1
     96 
     97     syncers = []
     98 
     99     syncer_backends = SQLBackend.resume(chain_spec, block_offset)
    100 
    101     if len(syncer_backends) == 0:
    102         initial_block_start = config.get('SYNCER_HISTORY_START', 0)
    103         if isinstance(initial_block_start, str):
    104             initial_block_start = int(initial_block_start)
    105         initial_block_offset = block_offset
    106         if config.true('SYNCER_SKIP_HISTORY'):
    107             initial_block_start = block_offset
    108             initial_block_offset += 1
    109         syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start))
    110         logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset))
    111     else:
    112         for syncer_backend in syncer_backends:
    113             logg.info('resuming sync session {}'.format(syncer_backend))
    114 
    115     chain_interface = EthChainInterface()
    116     for syncer_backend in syncer_backends:
    117         syncers.append(HistorySyncer(syncer_backend, chain_interface))
    118 
    119     syncer_backend = SQLBackend.live(chain_spec, block_offset+1)
    120     syncers.append(HeadSyncer(syncer_backend, chain_interface))
    121 
    122     state_filter = StateFilter(chain_spec)
    123     filters = [
    124         state_filter,
    125             ]
    126 
    127     i = 0
    128     for syncer in syncers:
    129         logg.debug('running syncer index {}'.format(i))
    130         for f in filters:
    131             syncer.add_filter(f)
    132         r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
    133         sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
    134 
    135         i += 1
    136 
    137     sys.exit(0)
    138 
    139 
    140 if __name__ == '__main__':
    141     main()