chainsyncer

Blockchain syncer driver
Log | Files | Refs | LICENSE

commit 66c0fd0b51089cf437ebfcbcf835100a0c50c3dd
parent e8370de015cb906c03e3c831fdeea3590b322303
Author: nolash <dev@holbrook.no>
Date:   Wed,  3 Feb 2021 20:55:39 +0100

Add evm connector

Diffstat:
A.gitignore | 3+++
Acic_syncer/backend.py | 196+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Dcic_syncer/client.py | 45---------------------------------------------
Acic_syncer/client/__init__.py | 0
Acic_syncer/client/evm/__pycache__/response.cpython-38.pyc | 0
Acic_syncer/client/evm/__pycache__/websocket.cpython-38.pyc | 0
Acic_syncer/client/evm/response.py | 21+++++++++++++++++++++
Acic_syncer/client/evm/websocket.py | 37+++++++++++++++++++++++++++++++++++++
Acic_syncer/client/translate.py | 21+++++++++++++++++++++
Acic_syncer/db/__init__.py | 53+++++++++++++++++++++++++++++++++++++++++++++++++++++
Acic_syncer/db/models/base.py | 73+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acic_syncer/db/models/sync.py | 168+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acic_syncer/driver.py | 65+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acic_syncer/error.py | 8++++++++
Mcic_syncer/runnable/tracker.py | 52+++++++++++++++++++++++++---------------------------
Aconfig/cic.ini | 2++
Aconfig/database.ini | 9+++++++++
Aconfig/eth.ini | 2++
Aconfig/tasks.ini | 2++
Arequirements.txt | 8++++++++
20 files changed, 693 insertions(+), 72 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -0,0 +1,3 @@ +__pycache__ +*.pyc +*.o diff --git a/cic_syncer/backend.py b/cic_syncer/backend.py @@ -0,0 +1,196 @@ +# standard imports +import logging + +# local imports +from cic_eth.db.models.sync import BlockchainSync +from cic_eth.db.models.base import SessionBase + +logg = logging.getLogger() + + +class SyncerBackend: + """Interface to block and transaction sync state. + + :param chain_spec: Chain spec for the chain that syncer is running for. + :type chain_spec: cic_registry.chain.ChainSpec + :param object_id: Unique id for the syncer session. + :type object_id: number + """ + def __init__(self, chain_spec, object_id): + self.db_session = None + self.db_object = None + self.chain_spec = chain_spec + self.object_id = object_id + self.connect() + self.disconnect() + + + def connect(self): + """Loads the state of the syncer session with the given id. + """ + self.db_session = SessionBase.create_session() + q = self.db_session.query(BlockchainSync) + q = q.filter(BlockchainSync.id==self.object_id) + self.db_object = q.first() + if self.db_object == None: + raise ValueError('sync entry with id {} not found'.format(self.object_id)) + + + def disconnect(self): + """Commits state of sync to backend. + """ + self.db_session.add(self.db_object) + self.db_session.commit() + self.db_session.close() + + + def chain(self): + """Returns chain spec for syncer + + :returns: Chain spec + :rtype chain_spec: cic_registry.chain.ChainSpec + """ + return self.chain_spec + + + def get(self): + """Get the current state of the syncer cursor. + + :returns: Block and block transaction height, respectively + :rtype: tuple + """ + self.connect() + pair = self.db_object.cursor() + self.disconnect() + return pair + + + def set(self, block_height, tx_height): + """Update the state of the syncer cursor + :param block_height: Block height of cursor + :type block_height: number + :param tx_height: Block transaction height of cursor + :type tx_height: number + :returns: Block and block transaction height, respectively + :rtype: tuple + """ + self.connect() + pair = self.db_object.set(block_height, tx_height) + self.disconnect() + return pair + + + def start(self): + """Get the initial state of the syncer cursor. + + :returns: Initial block and block transaction height, respectively + :rtype: tuple + """ + self.connect() + pair = self.db_object.start() + self.disconnect() + return pair + + + def target(self): + """Get the target state (upper bound of sync) of the syncer cursor. + + :returns: Target block height + :rtype: number + """ + self.connect() + target = self.db_object.target() + self.disconnect() + return target + + + @staticmethod + def first(chain): + """Returns the model object of the most recent syncer in backend. + + :param chain: Chain spec of chain that syncer is running for. + :type chain: cic_registry.chain.ChainSpec + :returns: Last syncer object + :rtype: cic_eth.db.models.BlockchainSync + """ + return BlockchainSync.first(chain) + + + @staticmethod + def initial(chain, block_height): + """Creates a new syncer session and commit its initial state to backend. + + :param chain: Chain spec of chain that syncer is running for. + :type chain: cic_registry.chain.ChainSpec + :param block_height: Target block height + :type block_height: number + :returns: New syncer object + :rtype: cic_eth.db.models.BlockchainSync + """ + object_id = None + session = SessionBase.create_session() + o = BlockchainSync(chain, 0, 0, block_height) + session.add(o) + session.commit() + object_id = o.id + session.close() + + return SyncerBackend(chain, object_id) + + + @staticmethod + def resume(chain, block_height): + """Retrieves and returns all previously unfinished syncer sessions. + + + :param chain: Chain spec of chain that syncer is running for. + :type chain: cic_registry.chain.ChainSpec + :param block_height: Target block height + :type block_height: number + :returns: Syncer objects of unfinished syncs + :rtype: list of cic_eth.db.models.BlockchainSync + """ + syncers = [] + + session = SessionBase.create_session() + + object_id = None + + for object_id in BlockchainSync.get_unsynced(session=session): + logg.debug('block syncer resume added previously unsynced sync entry id {}'.format(object_id)) + syncers.append(SyncerBackend(chain, object_id)) + + (block_resume, tx_resume) = BlockchainSync.get_last_live_height(block_height, session=session) + if block_height != block_resume: + o = BlockchainSync(chain, block_resume, tx_resume, block_height) + session.add(o) + session.commit() + object_id = o.id + syncers.append(SyncerBackend(chain, object_id)) + logg.debug('block syncer resume added new sync entry from previous run id {}, start{}:{} target {}'.format(object_id, block_resume, tx_resume, block_height)) + + session.close() + + return syncers + + + @staticmethod + def live(chain, block_height): + """Creates a new open-ended syncer session starting at the given block height. + + :param chain: Chain spec of chain that syncer is running for. + :type chain: cic_registry.chain.ChainSpec + :param block_height: Target block height + :type block_height: number + :returns: "Live" syncer object + :rtype: cic_eth.db.models.BlockchainSync + """ + object_id = None + session = SessionBase.create_session() + o = BlockchainSync(chain, block_height, 0, None) + session.add(o) + session.commit() + object_id = o.id + session.close() + + return SyncerBackend(chain, object_id) diff --git a/cic_syncer/client.py b/cic_syncer/client.py @@ -1,45 +0,0 @@ -# standard imports -import uuid -import logging - -# third-party imports -import websockets - -logg = logging.getLogger() - - -class Syncer: - - def __init__(self, backend): - super(HeadSyncer, self).__init__(backend) - - -class MinedSyncer(Syncer): - - def __init__(self, backend): - super(HeadSyncer, self).__init__(backend) - - - -class HeadSyncer(MinedSyncer): - - def __init__(self, backend): - super(HeadSyncer, self).__init__(backend) - - - def get(self, getter): - (block_number, tx_number) = self.backend - block_hash = [] - try: - uu = uuid.uuid4() - req = { - 'jsonrpc': '2.0', - 'method': 'eth_getBlock', - 'id': str(uu), - 'param': [block_number], - logg.debug(req) - except Exception as e: - logg.error(e) - - return block_hash - diff --git a/cic_syncer/client/__init__.py b/cic_syncer/client/__init__.py diff --git a/cic_syncer/client/evm/__pycache__/response.cpython-38.pyc b/cic_syncer/client/evm/__pycache__/response.cpython-38.pyc Binary files differ. diff --git a/cic_syncer/client/evm/__pycache__/websocket.cpython-38.pyc b/cic_syncer/client/evm/__pycache__/websocket.cpython-38.pyc Binary files differ. diff --git a/cic_syncer/client/evm/response.py b/cic_syncer/client/evm/response.py @@ -0,0 +1,21 @@ +from cic_syncer.client import translate + +translations = { + 'block_number': 'hex_to_int', + } + + +class EVMResponse: + + def __init__(self, item, response_object): + self.response_object = response_object + self.item = item + self.fn = getattr(translate, translations[self.item]) + + + def get_error(self): + return self.response_object.get('error') + + + def get_result(self): + return self.fn(self.response_object.get('result')) diff --git a/cic_syncer/client/evm/websocket.py b/cic_syncer/client/evm/websocket.py @@ -0,0 +1,37 @@ +import uuid +import json + +import websocket + +from .response import EVMResponse +from cic_syncer.error import RequestError + + + +class EVMWebsocketClient: + + def __init__(self, url): + self.url = url + self.conn = websocket.create_connection(url) + + + def __del__(self): + self.conn.close() + + + def block_number(self): + req_id = str(uuid.uuid4()) + req = { + 'jsonrpc': '2.0', + 'method': 'eth_blockNumber', + 'id': str(req_id), + 'params': [], + } + self.conn.send(json.dumps(req)) + r = self.conn.recv() + res = EVMResponse('block_number', json.loads(r)) + err = res.get_error() + if err != None: + raise RequestError(err) + + return res.get_result() diff --git a/cic_syncer/client/translate.py b/cic_syncer/client/translate.py @@ -0,0 +1,21 @@ +import re + +re_hex = r'^[0-9a-fA-Z]+$' +def is_hex(hx): + m = re.match(re_hex, hx) + if m == None: + raise ValueError('not valid hex {}'.format(hx)) + + return hx + + +def strip_0x(hx): + if len(hx) >= 2 and hx[:2] == '0x': + hx = hx[2:] + return is_hex(hx) + + +def hex_to_int(hx, endianness='big'): + hx = strip_0x(hx) + b = bytes.fromhex(hx) + return int.from_bytes(b, endianness) diff --git a/cic_syncer/db/__init__.py b/cic_syncer/db/__init__.py @@ -0,0 +1,53 @@ +# standard imports +import os +import logging + +# local imports +from cic_syncer.db.models.base import SessionBase + +logg = logging.getLogger() + + +def dsn_from_config(config): + """Generate a dsn string from the provided config dict. + + The config dict must include all well-known database connection parameters, and must implement the method "get(key)" to retrieve them. Any missing parameters will be be rendered as the literal string "None" + + :param config: Configuration object + :type config: Varies + :returns: dsn string + :rtype: str + """ + scheme = config.get('DATABASE_ENGINE') + if config.get('DATABASE_DRIVER') != None: + scheme += '+{}'.format(config.get('DATABASE_DRIVER')) + + dsn = '' + dsn_out = '' + if config.get('DATABASE_ENGINE') == 'sqlite': + dsn = '{}:///{}'.format( + scheme, + config.get('DATABASE_NAME'), + ) + dsn_out = dsn + + else: + dsn = '{}://{}:{}@{}:{}/{}'.format( + scheme, + config.get('DATABASE_USER'), + config.get('DATABASE_PASSWORD'), + config.get('DATABASE_HOST'), + config.get('DATABASE_PORT'), + config.get('DATABASE_NAME'), + ) + dsn_out = '{}://{}:{}@{}:{}/{}'.format( + scheme, + config.get('DATABASE_USER'), + '***', + config.get('DATABASE_HOST'), + config.get('DATABASE_PORT'), + config.get('DATABASE_NAME'), + ) + logg.debug('parsed dsn from config: {}'.format(dsn_out)) + return dsn + diff --git a/cic_syncer/db/models/base.py b/cic_syncer/db/models/base.py @@ -0,0 +1,73 @@ +# third-party imports +from sqlalchemy import Column, Integer +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +Model = declarative_base(name='Model') + + +class SessionBase(Model): + """The base object for all SQLAlchemy enabled models. All other models must extend this. + """ + __abstract__ = True + + id = Column(Integer, primary_key=True) + + engine = None + """Database connection engine of the running aplication""" + sessionmaker = None + """Factory object responsible for creating sessions from the connection pool""" + transactional = True + """Whether the database backend supports query transactions. Should be explicitly set by initialization code""" + poolable = True + """Whether the database backend supports query transactions. Should be explicitly set by initialization code""" + + + @staticmethod + def create_session(): + """Creates a new database session. + """ + return SessionBase.sessionmaker() + + + @staticmethod + def _set_engine(engine): + """Sets the database engine static property + """ + SessionBase.engine = engine + SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine) + + + @staticmethod + def connect(dsn, debug=False): + """Create new database connection engine and connect to database backend. + + :param dsn: DSN string defining connection. + :type dsn: str + """ + e = None + if SessionBase.poolable: + e = create_engine( + dsn, + max_overflow=50, + pool_pre_ping=True, + pool_size=20, + pool_recycle=10, + echo=debug, + ) + else: + e = create_engine( + dsn, + echo=debug, + ) + + SessionBase._set_engine(e) + + + @staticmethod + def disconnect(): + """Disconnect from database and free resources. + """ + SessionBase.engine.dispose() + SessionBase.engine = None diff --git a/cic_syncer/db/models/sync.py b/cic_syncer/db/models/sync.py @@ -0,0 +1,168 @@ +# standard imports +import datetime + +# third-party imports +from sqlalchemy import Column, String, Integer, DateTime, Text, Boolean +from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method + +# local imports +from .base import SessionBase + + +class BlockchainSync(SessionBase): + """Syncer control backend. + + :param chain: Chain spec string representation + :type chain: str + :param block_start: Block number to start sync from + :type block_start: number + :param tx_start: Block transaction number to start sync from + :type tx_start: number + :param block_target: Block number to sync until, inclusive + :type block_target: number + """ + __tablename__ = 'blockchain_sync' + + blockchain = Column(String) + block_start = Column(Integer) + tx_start = Column(Integer) + block_cursor = Column(Integer) + tx_cursor = Column(Integer) + block_target = Column(Integer) + date_created = Column(DateTime, default=datetime.datetime.utcnow) + date_updated = Column(DateTime) + + + @staticmethod + def first(chain, session=None): + """Check if a sync session for the specified chain already exists. + + :param chain: Chain spec string representation + :type chain: str + :param session: Session to use. If not specified, a separate session will be created for this method only. + :type session: SqlAlchemy Session + :returns: True if sync record found + :rtype: bool + """ + local_session = False + if session == None: + session = SessionBase.create_session() + local_session = True + q = session.query(BlockchainSync.id) + q = q.filter(BlockchainSync.blockchain==chain) + o = q.first() + if local_session: + session.close() + return o == None + + + @staticmethod + def get_last_live_height(current, session=None): + """Get the most recent open-ended ("live") syncer record. + + :param current: Current block number + :type current: number + :param session: Session to use. If not specified, a separate session will be created for this method only. + :type session: SqlAlchemy Session + :returns: Block and transaction number, respectively + :rtype: tuple + """ + local_session = False + if session == None: + session = SessionBase.create_session() + local_session = True + q = session.query(BlockchainSync) + q = q.filter(BlockchainSync.block_target==None) + q = q.order_by(BlockchainSync.date_created.desc()) + o = q.first() + if local_session: + session.close() + + if o == None: + return (0, 0) + + return (o.block_cursor, o.tx_cursor) + + + @staticmethod + def get_unsynced(session=None): + """Get previous bounded sync sessions that did not complete. + + :param session: Session to use. If not specified, a separate session will be created for this method only. + :type session: SqlAlchemy Session + :returns: Syncer database ids + :rtype: tuple, where first element is id + """ + unsynced = [] + local_session = False + if session == None: + session = SessionBase.create_session() + local_session = True + q = session.query(BlockchainSync.id) + q = q.filter(BlockchainSync.block_target!=None) + q = q.filter(BlockchainSync.block_cursor<BlockchainSync.block_target) + q = q.order_by(BlockchainSync.date_created.asc()) + for u in q.all(): + unsynced.append(u[0]) + if local_session: + session.close() + + return unsynced + + + def set(self, block_height, tx_height): + """Set the height of the syncer instance. + + Only manipulates object, does not transaction or commit to backend. + + :param block_height: Block number + :type block_height: number + :param tx_height: Block transaction number + :type tx_height: number + """ + self.block_cursor = block_height + self.tx_cursor = tx_height + + + def cursor(self): + """Get current state of cursor from cached instance. + + :returns: Block and transaction height, respectively + :rtype: tuple + """ + return (self.block_cursor, self.tx_cursor) + + + def start(self): + """Get sync block start position from cached instance. + + :returns: Block and transaction height, respectively + :rtype: tuple + """ + return (self.block_start, self.tx_start) + + + def target(self): + """Get sync block upper bound from cached instance. + + :returns: Block number + :rtype: number, or None if sync is open-ended + """ + return self.block_target + + + def chain(self): + """Get chain the cached instance represents. + """ + return self.blockchain + + + def __init__(self, chain, block_start, tx_start, block_target=None): + self.blockchain = chain + self.block_start = block_start + self.tx_start = tx_start + self.block_cursor = block_start + self.tx_cursor = tx_start + self.block_target = block_target + self.date_created = datetime.datetime.utcnow() + self.date_modified = datetime.datetime.utcnow() diff --git a/cic_syncer/driver.py b/cic_syncer/driver.py @@ -0,0 +1,65 @@ +# standard imports +import uuid +import logging + +# third-party imports +import websockets + +logg = logging.getLogger() + + +class Syncer: + + def __init__(self, backend): + self.cursor = None + self.running = True + self.backend = backend + self.filter = [] + + + def chain(self): + """Returns the string representation of the chain spec for the chain the syncer is running on. + + :returns: Chain spec string + :rtype: str + """ + return self.bc_cache.chain() + + + +class MinedSyncer(Syncer): + + def __init__(self, backend): + super(HeadSyncer, self).__init__(backend) + + + def loop(self, interval): + while self.running and Syncer.running_global: + getter = self.backend.connect() + logg.debug('loop execute') + e = self.get(getter) + + +class HeadSyncer(MinedSyncer): + + def __init__(self, backend): + super(HeadSyncer, self).__init__(backend) + + + def get(self, getter): + (block_number, tx_number) = self.backend.get() + block_hash = [] + try: + uu = uuid.uuid4() + req = { + 'jsonrpc': '2.0', + 'method': 'eth_getBlock', + 'id': str(uu), + 'param': [block_number], + } + logg.debug(req) + except Exception as e: + logg.error(e) + + return block_hash + diff --git a/cic_syncer/error.py b/cic_syncer/error.py @@ -0,0 +1,8 @@ +class LoopDone(Exception): + """Exception raised when a syncing is complete. + """ + pass + + +class RequestError(Exception): + pass diff --git a/cic_syncer/runnable/tracker.py b/cic_syncer/runnable/tracker.py @@ -9,23 +9,18 @@ import re # third-party imports import confini -import rlp - - -# local imports - +from cic_syncer.driver import HeadSyncer +from cic_syncer.db import dsn_from_config +from cic_syncer.db.models.base import SessionBase +from cic_syncer.client.evm.websocket import EVMWebsocketClient logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() -logging.getLogger('websockets.protocol').setLevel(logging.CRITICAL) -logging.getLogger('web3.RequestManager').setLevel(logging.CRITICAL) -logging.getLogger('web3.providers.WebsocketProvider').setLevel(logging.CRITICAL) -logging.getLogger('web3.providers.HTTPProvider').setLevel(logging.CRITICAL) - -config_dir = os.path.join('/usr/local/etc/cic-eth') +config_dir = '/usr/local/etc/cic-syncer' argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') +argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec') argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi') @@ -33,7 +28,6 @@ argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFI argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') argparser.add_argument('-v', help='be verbose', action='store_true') argparser.add_argument('-vv', help='be more verbose', action='store_true') -argparser.add_argument('mode', type=str, help='sync mode: (head|history)', default='head') args = argparser.parse_args(sys.argv[1:]) if args.v == True: @@ -47,15 +41,15 @@ config = confini.Config(config_dir, args.env_prefix) config.process() # override args args_override = { - 'ETH_ABI_DIR': getattr(args, 'abi_dir'), 'CIC_CHAIN_SPEC': getattr(args, 'i'), + 'ETH_PROVIDER': getattr(args, 'p'), } config.dict_override(args_override, 'cli flag') config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'SSL') logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) -app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) +#app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) queue = args.q @@ -94,24 +88,28 @@ def tx_filter(w3, tx, rcpt, chain_spec): return t - re_websocket = re.compile('^wss?://') re_http = re.compile('^https?://') -blockchain_provider = config.get('ETH_PROVIDER') -if re.match(re_websocket, blockchain_provider) != None: - blockchain_provider = WebsocketProvider(blockchain_provider) -elif re.match(re_http, blockchain_provider) != None: - blockchain_provider = HTTPProvider(blockchain_provider) -else: - raise ValueError('unknown provider url {}'.format(blockchain_provider)) - +c = EVMWebsocketClient(config.get('ETH_PROVIDER')) +chain = args.i +#blockchain_provider = config.get('ETH_PROVIDER') +#if re.match(re_websocket, blockchain_provider) != None: +# blockchain_provider = WebsocketProvider(blockchain_provider) +#elif re.match(re_http, blockchain_provider) != None: +# blockchain_provider = HTTPProvider(blockchain_provider) +#else: +# raise ValueError('unknown provider url {}'.format(blockchain_provider)) +# def main(): - chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) - c = RpcClient(chain_spec) + #chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) + #c = RpcClient(chain_spec) + + block_offset = c.block_number() + logg.debug('block offset {}'.format(block_offset)) - block_offset = c.w3.eth.blockNumber - chain = str(chain_spec) + return + syncer = SyncerBackend.live(chain, block_offset+1) for cb in config.get('TASKS_SYNCER_CALLBACKS', '').split(','): task_split = cb.split(':') diff --git a/config/cic.ini b/config/cic.ini @@ -0,0 +1,2 @@ +[cic] +chain_spec = diff --git a/config/database.ini b/config/database.ini @@ -0,0 +1,9 @@ +[database] +name = cic_syncer +user = postgres +password = +host = localhost +port = 5432 +engine = postgresql +driver = psycopg2 +debug = false diff --git a/config/eth.ini b/config/eth.ini @@ -0,0 +1,2 @@ +[eth] +provider = diff --git a/config/tasks.ini b/config/tasks.ini @@ -0,0 +1,2 @@ +[tasks] +syncer_callbacks = bar:baz diff --git a/requirements.txt b/requirements.txt @@ -0,0 +1,8 @@ +websocket-client==0.57.0 +psycopg2==2.8.6 +SQLAlchemy==1.3.20 +py-evm==0.3.0a20 +eth-tester==0.5.0b3 +web3==5.12.2 +confini==0.3.6b2 +semver==2.13.0