chainsyncer

Blockchain syncer driver
Log | Files | Refs | LICENSE

commit 7f0936b3e4c40ec8b64ca073176ef8dd2fbeb559
parent 3b40f0e6f696c41d4b69d4f089c6a4dc0c861710
Author: nolash <dev@holbrook.no>
Date:   Sat, 10 Apr 2021 00:30:08 +0200

Split up backend modules

Diffstat:
Dchainsyncer/backend.py | 343-------------------------------------------------------------------------------
Rchainsyncer/backend_file.py -> chainsyncer/backend/file.py | 0
Achainsyncer/backend/memory.py | 51+++++++++++++++++++++++++++++++++++++++++++++++++++
Achainsyncer/backend/sql.py | 298+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msetup.cfg | 2+-
Mtests/test_basic.py | 4++--
Mtests/test_database.py | 3++-
Mtests/test_file.py | 2+-
8 files changed, 355 insertions(+), 348 deletions(-)

diff --git a/chainsyncer/backend.py b/chainsyncer/backend.py @@ -1,343 +0,0 @@ -# standard imports -import logging -import uuid - -# third-party imports -from chainlib.chain import ChainSpec - -# local imports -from chainsyncer.db.models.sync import BlockchainSync -from chainsyncer.db.models.filter import BlockchainSyncFilter -from chainsyncer.db.models.base import SessionBase - -logg = logging.getLogger(__name__) - - -class SyncerBackend: - """Interface to block and transaction sync state. - - :param chain_spec: Chain spec for the chain that syncer is running for. - :type chain_spec: cic_registry.chain.ChainSpec - :param object_id: Unique id for the syncer session. - :type object_id: number - """ - def __init__(self, chain_spec, object_id): - self.db_session = None - self.db_object = None - self.db_object_filter = None - self.chain_spec = chain_spec - self.object_id = object_id - self.connect() - self.disconnect() - - - def connect(self): - """Loads the state of the syncer session with the given id. - """ - if self.db_session == None: - self.db_session = SessionBase.create_session() - - q = self.db_session.query(BlockchainSync) - q = q.filter(BlockchainSync.id==self.object_id) - self.db_object = q.first() - - if self.db_object != None: - qtwo = self.db_session.query(BlockchainSyncFilter) - qtwo = qtwo.join(BlockchainSync) - qtwo = qtwo.filter(BlockchainSync.id==self.db_object.id) - self.db_object_filter = qtwo.first() - - if self.db_object == None: - raise ValueError('sync entry with id {} not found'.format(self.object_id)) - - return self.db_session - - - def disconnect(self): - """Commits state of sync to backend. - """ - if self.db_object_filter != None: - self.db_session.add(self.db_object_filter) - self.db_session.add(self.db_object) - self.db_session.commit() - self.db_session.close() - self.db_session = None - - - def chain(self): - """Returns chain spec for syncer - - :returns: Chain spec - :rtype chain_spec: cic_registry.chain.ChainSpec - """ - return self.chain_spec - - - def get(self): - """Get the current state of the syncer cursor. - - :returns: Block and block transaction height, respectively - :rtype: tuple - """ - self.connect() - pair = self.db_object.cursor() - (filter_state, count, digest) = self.db_object_filter.cursor() - self.disconnect() - return (pair, filter_state,) - - - def set(self, block_height, tx_height): - """Update the state of the syncer cursor - :param block_height: Block height of cursor - :type block_height: number - :param tx_height: Block transaction height of cursor - :type tx_height: number - :returns: Block and block transaction height, respectively - :rtype: tuple - """ - self.connect() - pair = self.db_object.set(block_height, tx_height) - self.db_object_filter.clear() - (filter_state, count, digest)= self.db_object_filter.cursor() - self.disconnect() - return (pair, filter_state,) - - - def start(self): - """Get the initial state of the syncer cursor. - - :returns: Initial block and block transaction height, respectively - :rtype: tuple - """ - self.connect() - pair = self.db_object.start() - (filter_state, count, digest) = self.db_object_filter.start() - self.disconnect() - return (pair, filter_state,) - - - def target(self): - """Get the target state (upper bound of sync) of the syncer cursor. - - :returns: Target block height - :rtype: number - """ - self.connect() - target = self.db_object.target() - (filter_target, count, digest) = self.db_object_filter.target() - self.disconnect() - return (target, filter_target,) - - - @staticmethod - def first(chain_spec): - """Returns the model object of the most recent syncer in backend. - - :param chain: Chain spec of chain that syncer is running for. - :type chain: cic_registry.chain.ChainSpec - :returns: Last syncer object - :rtype: cic_eth.db.models.BlockchainSync - """ - #return BlockchainSync.first(str(chain_spec)) - object_id = BlockchainSync.first(str(chain_spec)) - if object_id == None: - return None - return SyncerBackend(chain_spec, object_id) - - - - @staticmethod - def initial(chain_spec, target_block_height, start_block_height=0): - """Creates a new syncer session and commit its initial state to backend. - - :param chain: Chain spec of chain that syncer is running for. - :type chain: cic_registry.chain.ChainSpec - :param block_height: Target block height - :type block_height: number - :returns: New syncer object - :rtype: cic_eth.db.models.BlockchainSync - """ - if start_block_height >= target_block_height: - raise ValueError('start block height must be lower than target block height') - object_id = None - session = SessionBase.create_session() - o = BlockchainSync(str(chain_spec), start_block_height, 0, target_block_height) - session.add(o) - session.commit() - object_id = o.id - - of = BlockchainSyncFilter(o) - session.add(of) - session.commit() - - session.close() - - return SyncerBackend(chain_spec, object_id) - - - @staticmethod - def resume(chain_spec, block_height): - """Retrieves and returns all previously unfinished syncer sessions. - - - :param chain_spec: Chain spec of chain that syncer is running for. - :type chain_spec: cic_registry.chain.ChainSpec - :param block_height: Target block height - :type block_height: number - :returns: Syncer objects of unfinished syncs - :rtype: list of cic_eth.db.models.BlockchainSync - """ - syncers = [] - - session = SessionBase.create_session() - - object_id = None - - highest_unsynced_block = 0 - highest_unsynced_tx = 0 - object_id = BlockchainSync.get_last(session=session, live=False) - if object_id != None: - q = session.query(BlockchainSync) - o = q.get(object_id) - (highest_unsynced_block, highest_unsynced_index) = o.cursor() - - object_ids = BlockchainSync.get_unsynced(session=session) - session.close() - - for object_id in object_ids: - s = SyncerBackend(chain_spec, object_id) - logg.debug('resume unfinished {}'.format(s)) - syncers.append(s) - - session = SessionBase.create_session() - - last_live_id = BlockchainSync.get_last(session=session) - if last_live_id != None: - - q = session.query(BlockchainSync) - o = q.get(last_live_id) - - (block_resume, tx_resume) = o.cursor() - session.flush() - - #if block_height != block_resume: - if highest_unsynced_block < block_resume: - - q = session.query(BlockchainSyncFilter) - q = q.filter(BlockchainSyncFilter.chain_sync_id==last_live_id) - of = q.first() - (flags, count, digest) = of.cursor() - - session.flush() - - o = BlockchainSync(str(chain_spec), block_resume, tx_resume, block_height) - session.add(o) - session.flush() - object_id = o.id - - of = BlockchainSyncFilter(o, count, flags, digest) - session.add(of) - session.commit() - - backend = SyncerBackend(chain_spec, object_id) - syncers.append(backend) - - logg.debug('last live session resume {}'.format(backend)) - - session.close() - - return syncers - - - @staticmethod - def live(chain_spec, block_height): - """Creates a new open-ended syncer session starting at the given block height. - - :param chain: Chain spec of chain that syncer is running for. - :type chain: cic_registry.chain.ChainSpec - :param block_height: Target block height - :type block_height: number - :returns: "Live" syncer object - :rtype: cic_eth.db.models.BlockchainSync - """ - object_id = None - session = SessionBase.create_session() - - o = BlockchainSync(str(chain_spec), block_height, 0, None) - session.add(o) - session.flush() - object_id = o.id - - of = BlockchainSyncFilter(o) - session.add(of) - session.commit() - - session.close() - - return SyncerBackend(chain_spec, object_id) - - - def register_filter(self, name): - self.connect() - if self.db_object_filter == None: - self.db_object_filter = BlockchainSyncFilter(self.db_object) - self.db_object_filter.add(name) - self.db_session.add(self.db_object_filter) - self.disconnect() - - - def complete_filter(self, n): - self.db_object_filter.set(n) - - - - def __str__(self): - return "syncerbackend chain {} start {} target {}".format(self.chain(), self.start(), self.target()) - - - -class MemBackend: - - def __init__(self, chain_spec, object_id, target_block=None): - self.object_id = object_id - self.chain_spec = chain_spec - self.block_height = 0 - self.tx_height = 0 - self.flags = 0 - self.target_block = target_block - self.db_session = None - - - def connect(self): - pass - - - def disconnect(self): - pass - - - def set(self, block_height, tx_height): - logg.debug('stateless backend received {} {}'.format(block_height, tx_height)) - self.block_height = block_height - self.tx_height = tx_height - - - def get(self): - return ((self.block_height, self.tx_height), self.flags) - - - def target(self): - return (self.target_block, self.flags) - - - def register_filter(self, name): - pass - - - def complete_filter(self, n): - pass - - - def __str__(self): - return "syncer membackend chain {} cursor".format(self.get()) - diff --git a/chainsyncer/backend_file.py b/chainsyncer/backend/file.py diff --git a/chainsyncer/backend/memory.py b/chainsyncer/backend/memory.py @@ -0,0 +1,51 @@ +# standard imports +import logging + +logg = logging.getLogger().getChild(__name__) + + +class MemBackend: + + def __init__(self, chain_spec, object_id, target_block=None): + self.object_id = object_id + self.chain_spec = chain_spec + self.block_height = 0 + self.tx_height = 0 + self.flags = 0 + self.target_block = target_block + self.db_session = None + + + def connect(self): + pass + + + def disconnect(self): + pass + + + def set(self, block_height, tx_height): + logg.debug('stateless backend received {} {}'.format(block_height, tx_height)) + self.block_height = block_height + self.tx_height = tx_height + + + def get(self): + return ((self.block_height, self.tx_height), self.flags) + + + def target(self): + return (self.target_block, self.flags) + + + def register_filter(self, name): + pass + + + def complete_filter(self, n): + pass + + + def __str__(self): + return "syncer membackend chain {} cursor".format(self.get()) + diff --git a/chainsyncer/backend/sql.py b/chainsyncer/backend/sql.py @@ -0,0 +1,298 @@ +# standard imports +import logging +import uuid + +# third-party imports +from chainlib.chain import ChainSpec + +# local imports +from chainsyncer.db.models.sync import BlockchainSync +from chainsyncer.db.models.filter import BlockchainSyncFilter +from chainsyncer.db.models.base import SessionBase + +logg = logging.getLogger().getChild(__name__) + + +class SyncerBackend: + """Interface to block and transaction sync state. + + :param chain_spec: Chain spec for the chain that syncer is running for. + :type chain_spec: cic_registry.chain.ChainSpec + :param object_id: Unique id for the syncer session. + :type object_id: number + """ + def __init__(self, chain_spec, object_id): + self.db_session = None + self.db_object = None + self.db_object_filter = None + self.chain_spec = chain_spec + self.object_id = object_id + self.connect() + self.disconnect() + + + def connect(self): + """Loads the state of the syncer session with the given id. + """ + if self.db_session == None: + self.db_session = SessionBase.create_session() + + q = self.db_session.query(BlockchainSync) + q = q.filter(BlockchainSync.id==self.object_id) + self.db_object = q.first() + + if self.db_object != None: + qtwo = self.db_session.query(BlockchainSyncFilter) + qtwo = qtwo.join(BlockchainSync) + qtwo = qtwo.filter(BlockchainSync.id==self.db_object.id) + self.db_object_filter = qtwo.first() + + if self.db_object == None: + raise ValueError('sync entry with id {} not found'.format(self.object_id)) + + return self.db_session + + + def disconnect(self): + """Commits state of sync to backend. + """ + if self.db_object_filter != None: + self.db_session.add(self.db_object_filter) + self.db_session.add(self.db_object) + self.db_session.commit() + self.db_session.close() + self.db_session = None + + + def chain(self): + """Returns chain spec for syncer + + :returns: Chain spec + :rtype chain_spec: cic_registry.chain.ChainSpec + """ + return self.chain_spec + + + def get(self): + """Get the current state of the syncer cursor. + + :returns: Block and block transaction height, respectively + :rtype: tuple + """ + self.connect() + pair = self.db_object.cursor() + (filter_state, count, digest) = self.db_object_filter.cursor() + self.disconnect() + return (pair, filter_state,) + + + def set(self, block_height, tx_height): + """Update the state of the syncer cursor + :param block_height: Block height of cursor + :type block_height: number + :param tx_height: Block transaction height of cursor + :type tx_height: number + :returns: Block and block transaction height, respectively + :rtype: tuple + """ + self.connect() + pair = self.db_object.set(block_height, tx_height) + self.db_object_filter.clear() + (filter_state, count, digest)= self.db_object_filter.cursor() + self.disconnect() + return (pair, filter_state,) + + + def start(self): + """Get the initial state of the syncer cursor. + + :returns: Initial block and block transaction height, respectively + :rtype: tuple + """ + self.connect() + pair = self.db_object.start() + (filter_state, count, digest) = self.db_object_filter.start() + self.disconnect() + return (pair, filter_state,) + + + def target(self): + """Get the target state (upper bound of sync) of the syncer cursor. + + :returns: Target block height + :rtype: number + """ + self.connect() + target = self.db_object.target() + (filter_target, count, digest) = self.db_object_filter.target() + self.disconnect() + return (target, filter_target,) + + + @staticmethod + def first(chain_spec): + """Returns the model object of the most recent syncer in backend. + + :param chain: Chain spec of chain that syncer is running for. + :type chain: cic_registry.chain.ChainSpec + :returns: Last syncer object + :rtype: cic_eth.db.models.BlockchainSync + """ + #return BlockchainSync.first(str(chain_spec)) + object_id = BlockchainSync.first(str(chain_spec)) + if object_id == None: + return None + return SyncerBackend(chain_spec, object_id) + + + + @staticmethod + def initial(chain_spec, target_block_height, start_block_height=0): + """Creates a new syncer session and commit its initial state to backend. + + :param chain: Chain spec of chain that syncer is running for. + :type chain: cic_registry.chain.ChainSpec + :param block_height: Target block height + :type block_height: number + :returns: New syncer object + :rtype: cic_eth.db.models.BlockchainSync + """ + if start_block_height >= target_block_height: + raise ValueError('start block height must be lower than target block height') + object_id = None + session = SessionBase.create_session() + o = BlockchainSync(str(chain_spec), start_block_height, 0, target_block_height) + session.add(o) + session.commit() + object_id = o.id + + of = BlockchainSyncFilter(o) + session.add(of) + session.commit() + + session.close() + + return SyncerBackend(chain_spec, object_id) + + + @staticmethod + def resume(chain_spec, block_height): + """Retrieves and returns all previously unfinished syncer sessions. + + + :param chain_spec: Chain spec of chain that syncer is running for. + :type chain_spec: cic_registry.chain.ChainSpec + :param block_height: Target block height + :type block_height: number + :returns: Syncer objects of unfinished syncs + :rtype: list of cic_eth.db.models.BlockchainSync + """ + syncers = [] + + session = SessionBase.create_session() + + object_id = None + + highest_unsynced_block = 0 + highest_unsynced_tx = 0 + object_id = BlockchainSync.get_last(session=session, live=False) + if object_id != None: + q = session.query(BlockchainSync) + o = q.get(object_id) + (highest_unsynced_block, highest_unsynced_index) = o.cursor() + + object_ids = BlockchainSync.get_unsynced(session=session) + session.close() + + for object_id in object_ids: + s = SyncerBackend(chain_spec, object_id) + logg.debug('resume unfinished {}'.format(s)) + syncers.append(s) + + session = SessionBase.create_session() + + last_live_id = BlockchainSync.get_last(session=session) + if last_live_id != None: + + q = session.query(BlockchainSync) + o = q.get(last_live_id) + + (block_resume, tx_resume) = o.cursor() + session.flush() + + #if block_height != block_resume: + if highest_unsynced_block < block_resume: + + q = session.query(BlockchainSyncFilter) + q = q.filter(BlockchainSyncFilter.chain_sync_id==last_live_id) + of = q.first() + (flags, count, digest) = of.cursor() + + session.flush() + + o = BlockchainSync(str(chain_spec), block_resume, tx_resume, block_height) + session.add(o) + session.flush() + object_id = o.id + + of = BlockchainSyncFilter(o, count, flags, digest) + session.add(of) + session.commit() + + backend = SyncerBackend(chain_spec, object_id) + syncers.append(backend) + + logg.debug('last live session resume {}'.format(backend)) + + session.close() + + return syncers + + + @staticmethod + def live(chain_spec, block_height): + """Creates a new open-ended syncer session starting at the given block height. + + :param chain: Chain spec of chain that syncer is running for. + :type chain: cic_registry.chain.ChainSpec + :param block_height: Target block height + :type block_height: number + :returns: "Live" syncer object + :rtype: cic_eth.db.models.BlockchainSync + """ + object_id = None + session = SessionBase.create_session() + + o = BlockchainSync(str(chain_spec), block_height, 0, None) + session.add(o) + session.flush() + object_id = o.id + + of = BlockchainSyncFilter(o) + session.add(of) + session.commit() + + session.close() + + return SyncerBackend(chain_spec, object_id) + + + def register_filter(self, name): + self.connect() + if self.db_object_filter == None: + self.db_object_filter = BlockchainSyncFilter(self.db_object) + self.db_object_filter.add(name) + self.db_session.add(self.db_object_filter) + self.disconnect() + + + def complete_filter(self, n): + self.connect() + self.db_object_filter.set(n) + self.db_session.add(self.db_object_filter) + self.db_session.commit() + self.disconnect() + + + def __str__(self): + return "syncerbackend chain {} start {} target {}".format(self.chain(), self.start(), self.target()) diff --git a/setup.cfg b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.0.1a22 +version = 0.0.2a1 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no diff --git a/tests/test_basic.py b/tests/test_basic.py @@ -5,7 +5,7 @@ import unittest from chainlib.chain import ChainSpec # local imports -from chainsyncer.backend import SyncerBackend +from chainsyncer.backend.memory import MemBackend # testutil imports from tests.base import TestBase @@ -15,7 +15,7 @@ class TestBasic(TestBase): def test_hello(self): chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo') - backend = SyncerBackend(chain_spec, 'foo') + backend = MemBackend(chain_spec, 'foo') if __name__ == '__main__': diff --git a/tests/test_database.py b/tests/test_database.py @@ -8,7 +8,7 @@ from chainlib.chain import ChainSpec # local imports from chainsyncer.db.models.base import SessionBase from chainsyncer.db.models.filter import BlockchainSyncFilter -from chainsyncer.backend import SyncerBackend +from chainsyncer.backend.sql import SyncerBackend # testutil imports from tests.base import TestBase @@ -67,6 +67,7 @@ class TestDatabase(TestBase): session.close() + def test_backend_retrieve(self): s = SyncerBackend.live(self.chain_spec, 42) s.register_filter('foo') diff --git a/tests/test_file.py b/tests/test_file.py @@ -9,7 +9,7 @@ import shutil from chainlib.chain import ChainSpec # local imports -from chainsyncer.backend_file import SyncerFileBackend +from chainsyncer.backend.file import SyncerFileBackend logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger().getChild(__name__)