chainsyncer

Blockchain syncer driver
Log | Files | Refs | LICENSE

commit 8527901e6c5b12a1bd06f1d9f231196c9c736f72
parent e8decb9cb7c4f82b8749225b19f7e7fd7e1ea879
Author: lash <dev@holbrook.no>
Date:   Wed, 30 Mar 2022 06:55:21 +0000

Add chain interface driver

Diffstat:
M.gitignore | 1+
Dchainsyncer/driver.py | 139-------------------------------------------------------------------------------
Achainsyncer/driver/__init__.py | 1+
Achainsyncer/driver/base.py | 139+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Achainsyncer/driver/chain_interface.py | 57+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mchainsyncer/unittest/base.py | 59+++++++++++++++++++++++++++++++++++++++++++++++++++++++----
Msetup.cfg | 2+-
Atests/test_driver.py | 61+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtests/test_session.py | 1-
9 files changed, 315 insertions(+), 145 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -6,3 +6,4 @@ gmon.out build/ dist/ *.sqlite +old/ diff --git a/chainsyncer/driver.py b/chainsyncer/driver.py @@ -1,139 +0,0 @@ -# standard imports -import logging -import time -import signal - -# local imports -from chainsyncer.error import ( - SyncDone, - NoBlockForYou, - ) -from chainsyncer.session import SyncSession - - -logg = logging.getLogger(__name__) - -NS_DIV = 1000000000 - -class SyncDriver: - - running_global = True - """If set to false syncer will terminate polling loop.""" - yield_delay=0.005 - """Delay between each processed block.""" - signal_request = [signal.SIGINT, signal.SIGTERM] - """Signals to catch to request shutdown.""" - signal_set = False - """Whether shutdown signal has been received.""" - name = 'base' - """Syncer name, to be overriden for each extended implementation.""" - - - def __init__(self, store, offset=0, target=-1, pre_callback=None, post_callback=None, block_callback=None, idle_callback=None): - self.store = store - self.running = True - self.pre_callback = pre_callback - self.post_callback = post_callback - self.block_callback = block_callback - self.idle_callback = idle_callback - self.last_start = 0 - self.clock_id = time.CLOCK_MONOTONIC_RAW - self.store.connect() - self.store.start(offset=offset, target=target) - - - def __sig_terminate(self, sig, frame): - logg.warning('got signal {}'.format(sig)) - self.terminate() - - - def terminate(self): - """Set syncer to terminate as soon as possible. - """ - logg.info('termination requested!') - SyncDriver.running_global = False - self.running = False - - - def run(self, conn, interval=1): - while self.running_global: - self.session = SyncSession(self.store) - item = self.session.start() - if item == None: - self.running = False - self.running_global = False - break - self.loop(conn, item, interval=interval) - - - def idle(self, interval): - interval *= NS_DIV - idle_start = time.clock_gettime_ns(self.clock_id) - delta = idle_start - self.last_start - if delta > interval: - interval /= NS_DIV - time.sleep(interval) - return - - if self.idle_callback != None: - r = True - while r: - before = time.clock_gettime_ns(self.clock_id) - r = self.idle_callback(interval) - after = time.clock_gettime_ns(self.clock_id) - delta = after - before - if delta < 0: - return - interval -= delta - if interval < 0: - return - - interval /= NS_DIV - time.sleep(interval) - - - def loop(self, conn, item, interval=1): - logg.debug('started loop') - tx_start = item.tx_cursor - while self.running and SyncDriver.running_global: - self.last_start = time.clock_gettime_ns(self.clock_id) - if self.pre_callback != None: - self.pre_callback() - while True and self.running: - try: - block = self.get(conn, item) - except SyncDone as e: - logg.info('all blocks sumitted for processing: {}'.format(e)) - return - except NoBlockForYou as e: - break - if self.block_callback != None: - self.block_callback(block, None) - - try: - self.process(conn, item, block, tx_start) - except IndexError: - item.next(advance_block=True) - tx_start = 0 - time.sleep(self.yield_delay) - if self.post_callback != None: - self.post_callback() - - logg.debug('fooo') - if self.store.target > -1 and block.number >= self.store.target: - self.running = False - - self.idle(interval) - - - def process_single(self, conn, block, tx): - logg.debug('single') - self.session.filter(conn, block, tx) - - - def process(self, conn, block, tx_start): - raise NotImplementedError() - - - def get(self, conn): - raise NotImplementedError() diff --git a/chainsyncer/driver/__init__.py b/chainsyncer/driver/__init__.py @@ -0,0 +1 @@ +from .base import SyncDriver diff --git a/chainsyncer/driver/base.py b/chainsyncer/driver/base.py @@ -0,0 +1,139 @@ +# standard imports +import logging +import time +import signal + +# local imports +from chainsyncer.error import ( + SyncDone, + NoBlockForYou, + ) +from chainsyncer.session import SyncSession + + +logg = logging.getLogger(__name__) + +NS_DIV = 1000000000 + +class SyncDriver: + + running_global = True + """If set to false syncer will terminate polling loop.""" + yield_delay=0.005 + """Delay between each processed block.""" + signal_request = [signal.SIGINT, signal.SIGTERM] + """Signals to catch to request shutdown.""" + signal_set = False + """Whether shutdown signal has been received.""" + name = 'base' + """Syncer name, to be overriden for each extended implementation.""" + + + def __init__(self, store, offset=0, target=-1, pre_callback=None, post_callback=None, block_callback=None, idle_callback=None): + self.store = store + self.running = True + self.pre_callback = pre_callback + self.post_callback = post_callback + self.block_callback = block_callback + self.idle_callback = idle_callback + self.last_start = 0 + self.clock_id = time.CLOCK_MONOTONIC_RAW + self.store.connect() + self.store.start(offset=offset, target=target) + + + def __sig_terminate(self, sig, frame): + logg.warning('got signal {}'.format(sig)) + self.terminate() + + + def terminate(self): + """Set syncer to terminate as soon as possible. + """ + logg.info('termination requested!') + SyncDriver.running_global = False + self.running = False + + + def run(self, conn, interval=1): + while self.running_global: + self.session = SyncSession(self.store) + item = self.session.start() + if item == None: + self.running = False + self.running_global = False + break + self.loop(conn, item, interval=interval) + + + def idle(self, interval): + interval *= NS_DIV + idle_start = time.clock_gettime_ns(self.clock_id) + delta = idle_start - self.last_start + if delta > interval: + interval /= NS_DIV + time.sleep(interval) + return + + if self.idle_callback != None: + r = True + while r: + before = time.clock_gettime_ns(self.clock_id) + r = self.idle_callback(interval) + after = time.clock_gettime_ns(self.clock_id) + delta = after - before + if delta < 0: + return + interval -= delta + if interval < 0: + return + + interval /= NS_DIV + time.sleep(interval) + + + def loop(self, conn, item, interval=1): + logg.debug('started loop') + while self.running and SyncDriver.running_global: + self.last_start = time.clock_gettime_ns(self.clock_id) + + if self.pre_callback != None: + self.pre_callback() + + while True and self.running: + try: + block = self.get(conn, item) + except SyncDone as e: + logg.info('all blocks sumitted for processing: {}'.format(e)) + return + except NoBlockForYou as e: + break + if self.block_callback != None: + self.block_callback(block, None) + + try: + self.process(conn, item, block) + except IndexError: + item.next(advance_block=True) + time.sleep(self.yield_delay) + + if self.store.target > -1 and block.number >= self.store.target: + self.running = False + + if self.post_callback != None: + self.post_callback() + + + self.idle(interval) + + + def process_single(self, conn, block, tx): + self.session.filter(conn, block, tx) + + + def process(self, conn, item, block): + raise NotImplementedError() + + + def get(self, conn): + raise NotImplementedError() diff --git a/chainsyncer/driver/chain_interface.py b/chainsyncer/driver/chain_interface.py @@ -0,0 +1,57 @@ +# external imports +from chainlib.error import RPCException + +# local imports +from chainsyncer.error import NoBlockForYou +from chainsyncer.driver import SyncDriver + + +class ChainInterfaceDriver(SyncDriver): + + def __init__(self, store, chain_interface, offset=0, target=-1, pre_callback=None, post_callback=None, block_callback=None, idle_callback=None): + super(ChainInterfaceDriver, self).__init__(store, offset=offset, target=target, pre_callback=pre_callback, post_callback=post_callback, block_callback=block_callback, idle_callback=idle_callback) + self.chain_interface = chain_interface + + + def get(self, conn, item): + """Retrieve the block currently defined by the syncer cursor from the RPC provider. + + :param conn: RPC connection + :type conn: chainlib.connectin.RPCConnection + :raises NoBlockForYou: Block at the given height does not exist + :rtype: chainlib.block.Block + :returns: Block object + """ + o = self.chain_interface.block_by_number(item.cursor) + try: + r = conn.do(o) + except RPCException: + r = None + if r == None: + raise NoBlockForYou() + b = self.chain_interface.block_from_src(r) + b.txs = b.txs[item.tx_cursor:] + + return b + + + def process(self, conn, item, block): + tx_src = None + i = item.tx_cursor + while True: + # handle block objects regardless of whether the tx data is embedded or not + try: + tx = block.tx(i) + except AttributeError: + tx_hash = block.txs[i] + o = self.chain_interface.tx_by_hash(tx_hash, block=block) + r = conn.do(o) + #tx = self.chain_interface.tx_from_src(tx_src, block=block) + + rcpt = conn.do(self.chain_interface.tx_receipt(tx.hash)) + if rcpt != None: + tx.apply_receipt(self.chain_interface.src_normalize(rcpt)) + + self.process_single(conn, block, tx) + + i += 1 diff --git a/chainsyncer/unittest/base.py b/chainsyncer/unittest/base.py @@ -121,7 +121,7 @@ class MockBlock: :param i: Transaction index :type i: int """ - return MockTx(i, self.txs[i]) + return MockTx(i, self.txs[i].hash) class MockStore(State): @@ -170,7 +170,7 @@ class MockFilter: r = True self.brk -= 1 self.contents.append((block.number, tx.index, tx.hash,)) - logg.debug('filter {} result {} block {}'.format(self.common_name(), r, block.number)) + logg.debug('filter {} result {} block {} tx {} {}'.format(self.common_name(), r, block.number, tx.index, tx.hash)) return r @@ -202,8 +202,8 @@ class MockDriver(SyncDriver): raise NoBlockForYou() - def process(self, conn, item, block, tx_start): - i = tx_start + def process(self, conn, item, block): + i = item.tx_cursor while self.running: if self.interrupt != None: if self.interrupt[0] == block.number and self.interrupt[1] == i: @@ -216,3 +216,54 @@ class MockDriver(SyncDriver): self.process_single(conn, block, tx) item.next() i += 1 + + +class MockChainInterface: + + def block_by_number(self, number): + return ('block_by_number', number,) + + + def tx_by_hash(self, hsh): + return ('tx_by_hash', hsh,) + + + def block_from_src(self, src): + return src + + + def src_normalize(self, src): + return src + + + def tx_receipt(self, hsh): + return ('receipt', hsh,) + + +class MockChainInterfaceConn(MockConn): + + def __init__(self, interface): + self.ifc = interface + self.blocks = {} + self.txs = {} + + + def add_block(self, block): + logg.debug('add block {} {} with {} txs'.format(block.number, block.hash, len(block.txs))) + self.blocks[block.number] = block + for tx in block.txs: + self.txs[tx.hash] = tx + + + def do(self, o): + m = getattr(self, 'handle_' + o[0]) + return m(o[1]) + + + def handle_block_by_number(self, number): + return self.blocks[number] + + + + def handle_receipt(self, hsh): + return {} diff --git a/setup.cfg b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.2.0 +version = 0.3.0 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no diff --git a/tests/test_driver.py b/tests/test_driver.py @@ -0,0 +1,61 @@ +# standard imports +import unittest +import tempfile +import shutil +import logging +import stat +import os + +# local imports +from chainsyncer.store.fs import SyncFsStore +from chainsyncer.session import SyncSession +from chainsyncer.error import ( + LockError, + FilterDone, + IncompleteFilterError, + SyncDone, + ) +from chainsyncer.unittest import ( + MockBlockGenerator, + MockFilter, + MockChainInterfaceConn, + MockTx, + MockBlock, + MockChainInterface, + MockFilterError, + ) +from chainsyncer.driver.chain_interface import ChainInterfaceDriver + +logging.basicConfig(level=logging.DEBUG) +logg = logging.getLogger() + + +class TestFilter(unittest.TestCase): + + def setUp(self): + self.path = tempfile.mkdtemp() + self.store = SyncFsStore(self.path) + self.ifc = MockChainInterface() + self.conn = MockChainInterfaceConn(self.ifc) + + + def tearDown(self): + shutil.rmtree(self.path) + + + def test_driver(self): + generator = MockBlockGenerator() + generator.generate([1, 2], driver=self.conn) + + drv = ChainInterfaceDriver(self.store, self.ifc, target=1) + + fltr_one = MockFilter('foo') + self.store.register(fltr_one) + with self.assertRaises(SyncDone): + drv.run(self.conn) + + self.assertEqual(len(fltr_one.contents), 3) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_session.py b/tests/test_session.py @@ -117,7 +117,6 @@ class TestFilter(unittest.TestCase): fltr_three = MockFilter('baz') self.store.register(fltr_three) - store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler) with self.assertRaises(SyncDone):