chainsyncer

Blockchain syncer driver
Log | Files | Refs | LICENSE

commit 2d14515d34184863e5cbe06c1b9d6115666e12a0
parent 58e983efccba9ba1ac91ae6ee4166b487bcd4fa7
Author: lash <dev@holbrook.no>
Date:   Thu, 17 Mar 2022 10:16:55 +0000

Repair after merge

Diffstat:
Dtests/chainsyncer_base.py | 69---------------------------------------------------------------------
Mtests/test_basic.py | 11+++++------
Dtests/test_database.py | 199-------------------------------------------------------------------------------
Dtests/test_file.py | 121-------------------------------------------------------------------------------
Dtests/test_helo.py | 15---------------
Dtests/test_interrupt.py | 147-------------------------------------------------------------------------------
Dtests/test_mem.py | 32--------------------------------
Dtests/test_thread.py | 12------------
Dtests/test_thread_range.py | 114-------------------------------------------------------------------------------
9 files changed, 5 insertions(+), 715 deletions(-)

diff --git a/tests/chainsyncer_base.py b/tests/chainsyncer_base.py @@ -1,69 +0,0 @@ -# standard imports -import logging -import unittest -import tempfile -import os -#import pysqlite - -# external imports -from chainlib.chain import ChainSpec -from chainlib.interface import ChainInterface -from chainlib.eth.tx import ( - receipt, - Tx, - ) -from chainlib.eth.block import ( - block_by_number, - Block, - ) -from potaahto.symbols import snake_and_camel - -# local imports -from chainsyncer.db import dsn_from_config -from chainsyncer.db.models.base import SessionBase - -# test imports -from chainsyncer.unittest.db import ChainSyncerDb - -script_dir = os.path.realpath(os.path.dirname(__file__)) - -logging.basicConfig(level=logging.DEBUG) - - -class EthChainInterface(ChainInterface): - - def __init__(self): - self._tx_receipt = receipt - self._block_by_number = block_by_number - self._block_from_src = Block.from_src - self._tx_from_src = Tx.from_src - self._src_normalize = snake_and_camel - - -class TestBase(unittest.TestCase): - - interface = EthChainInterface() - - def setUp(self): - self.db = ChainSyncerDb() - - #f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '1.sql'), 'r') - #sql = f.read() - #f.close() - - #conn = SessionBase.engine.connect() - #conn.execute(sql) - - #f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '2.sql'), 'r') - #sql = f.read() - #f.close() - - #conn = SessionBase.engine.connect() - #conn.execute(sql) - self.session = self.db.bind_session() - self.chain_spec = ChainSpec('evm', 'foo', 42, 'bar') - - def tearDown(self): - self.session.commit() - self.db.release_session(self.session) - #os.unlink(self.db_path) diff --git a/tests/test_basic.py b/tests/test_basic.py @@ -1,17 +1,16 @@ # standard imports import unittest +import hashlib # external imports -from chainlib.chain import ChainSpec +from shep.state import State # local imports -from chainsyncer.backend.memory import MemBackend +from chainsyncer.session import SyncSession +from chainsyncer.state import SyncState -# testutil imports -from tests.chainsyncer_base import TestBase - -class TestBasic(TestBase): +class MockStore(State): def __init__(self, bits=0): super(MockStore, self).__init__(bits, check_alias=False) diff --git a/tests/test_database.py b/tests/test_database.py @@ -1,199 +0,0 @@ -# standard imports -import unittest -import logging - -# external imports -from chainlib.chain import ChainSpec - -# local imports -from chainsyncer.db.models.base import SessionBase -from chainsyncer.db.models.filter import BlockchainSyncFilter -from chainsyncer.backend.sql import SQLBackend -from chainsyncer.error import LockError - -# testutil imports -from tests.chainsyncer_base import TestBase - -logg = logging.getLogger() - - -class TestDatabase(TestBase): - - - def test_backend_live(self): - s = SQLBackend.live(self.chain_spec, 42) - self.assertEqual(s.object_id, 1) - backend = SQLBackend.first(self.chain_spec) - #SQLBackend(self.chain_spec, sync_id) - self.assertEqual(backend.object_id, 1) - - bogus_chain_spec = ChainSpec('bogus', 'foo', 13, 'baz') - sync_id = SQLBackend.first(bogus_chain_spec) - self.assertIsNone(sync_id) - - - def test_backend_filter_lock(self): - s = SQLBackend.live(self.chain_spec, 42) - - s.connect() - filter_id = s.db_object_filter.id - s.disconnect() - - session = SessionBase.create_session() - o = session.query(BlockchainSyncFilter).get(filter_id) - self.assertEqual(len(o.flags), 0) - session.close() - - s.register_filter(str(0)) - s.register_filter(str(1)) - - s.connect() - filter_id = s.db_object_filter.id - s.disconnect() - - session = SessionBase.create_session() - o = session.query(BlockchainSyncFilter).get(filter_id) - - o.set(1) - with self.assertRaises(LockError): - o.set(2) - o.release() - o.set(2) - - - def test_backend_filter(self): - s = SQLBackend.live(self.chain_spec, 42) - - s.connect() - filter_id = s.db_object_filter.id - s.disconnect() - - session = SessionBase.create_session() - o = session.query(BlockchainSyncFilter).get(filter_id) - self.assertEqual(len(o.flags), 0) - session.close() - - for i in range(9): - s.register_filter(str(i)) - - s.connect() - filter_id = s.db_object_filter.id - s.disconnect() - - session = SessionBase.create_session() - o = session.query(BlockchainSyncFilter).get(filter_id) - self.assertEqual(len(o.flags), 2) - - (t, c, d) = o.target() - self.assertEqual(t, (1 << 9) - 1) - - for i in range(9): - o.set(i) - o.release() - - (f, c, d) = o.cursor() - self.assertEqual(f, t) - self.assertEqual(c, 9) - self.assertEqual(d, o.digest) - - session.close() - - - def test_backend_retrieve(self): - s = SQLBackend.live(self.chain_spec, 42) - s.register_filter('foo') - s.register_filter('bar') - s.register_filter('baz') - - s.set(42, 13) - - s = SQLBackend.first(self.chain_spec) - self.assertEqual(s.get(), ((42,13), 0)) - - - def test_backend_initial(self): - with self.assertRaises(ValueError): - s = SQLBackend.initial(self.chain_spec, 42, 42) - - with self.assertRaises(ValueError): - s = SQLBackend.initial(self.chain_spec, 42, 43) - - s = SQLBackend.initial(self.chain_spec, 42, 13) - - s.set(43, 13) - - s = SQLBackend.first(self.chain_spec) - self.assertEqual(s.get(), ((43,13), 0)) - self.assertEqual(s.start(), ((13,0), 0)) - - - def test_backend_resume(self): - s = SQLBackend.resume(self.chain_spec, 666) - self.assertEqual(len(s), 0) - - s = SQLBackend.live(self.chain_spec, 42) - original_id = s.object_id - s = SQLBackend.resume(self.chain_spec, 666) - self.assertEqual(len(s), 1) - resumed_id = s[0].object_id - self.assertEqual(resumed_id, original_id + 1) - self.assertEqual(s[0].get(), ((42, 0), 0)) - - - def test_backend_resume_when_completed(self): - s = SQLBackend.live(self.chain_spec, 42) - - s = SQLBackend.resume(self.chain_spec, 666) - s[0].set(666, 0) - - s = SQLBackend.resume(self.chain_spec, 666) - self.assertEqual(len(s), 0) - - - def test_backend_resume_several(self): - s = SQLBackend.live(self.chain_spec, 42) - s.set(43, 13) - - s = SQLBackend.resume(self.chain_spec, 666) - SQLBackend.live(self.chain_spec, 666) - s[0].set(123, 2) - - s = SQLBackend.resume(self.chain_spec, 1024) - SQLBackend.live(self.chain_spec, 1024) - - self.assertEqual(len(s), 2) - self.assertEqual(s[0].target(), (666, 0)) - self.assertEqual(s[0].get(), ((123, 2), 0)) - self.assertEqual(s[1].target(), (1024, 0)) - self.assertEqual(s[1].get(), ((666, 0), 0)) - - - def test_backend_resume_filter(self): - s = SQLBackend.live(self.chain_spec, 42) - s.register_filter('foo') - s.register_filter('bar') - s.register_filter('baz') - - s.set(43, 13) - s.begin_filter(0) - s.begin_filter(2) - - s = SQLBackend.resume(self.chain_spec, 666) - (pair, flags) = s[0].get() - - self.assertEqual(flags, 5) - - - def test_backend_sql_custom(self): - chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo') - flags = 5 - flags_target = 1023 - flag_count = 10 - backend = SQLBackend.custom(chain_spec, 666, 42, 2, flags, flag_count) - self.assertEqual(((42, 2), flags), backend.start()) - self.assertEqual(((42, 2), flags), backend.get()) - self.assertEqual((666, flags_target), backend.target()) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_file.py b/tests/test_file.py @@ -1,121 +0,0 @@ -# standard imports -import logging -import uuid -import os -import unittest -import shutil - -# external imports -from chainlib.chain import ChainSpec - -# local imports -from chainsyncer.backend.file import FileBackend - -logging.basicConfig(level=logging.DEBUG) -logg = logging.getLogger().getChild(__name__) - -script_dir = os.path.dirname(__file__) -tmp_test_dir = os.path.join(script_dir, 'testdata', 'tmp') -chainsyncer_test_dir = os.path.join(tmp_test_dir, 'chainsyncer') -os.makedirs(tmp_test_dir, exist_ok=True) - - -class TestFile(unittest.TestCase): - - def setUp(self): - self.chain_spec = ChainSpec('foo', 'bar', 42, 'baz') - self.uu = FileBackend.create_object(self.chain_spec, None, base_dir=tmp_test_dir) - logg.debug('made uu {} for {}'.format(self.uu, self.chain_spec)) - - self.o = FileBackend(self.chain_spec, self.uu, base_dir=tmp_test_dir) - - - def tearDown(self): - self.o.purge() - shutil.rmtree(chainsyncer_test_dir) - - - @unittest.skip('foo') - def test_set(self): - self.o.set(42, 13) - - o = FileBackend(self.chain_spec, self.o.object_id, base_dir=tmp_test_dir) - - state = o.get() - - self.assertEqual(state[0], 42) - self.assertEqual(state[1], 13) - - - @unittest.skip('foo') - def test_initial(self): - local_uu = FileBackend.initial(self.chain_spec, 1337, start_block_height=666, base_dir=tmp_test_dir) - - o = FileBackend(self.chain_spec, local_uu, base_dir=tmp_test_dir) - - (pair, filter_stats) = o.target() - self.assertEqual(pair[0], 1337) - self.assertEqual(pair[1], 0) - - (pair, filter_stats) = o.start() - self.assertEqual(pair[0], 666) - self.assertEqual(pair[1], 0) - - - @unittest.skip('foo') - def test_resume(self): - for i in range(1, 10): - local_uu = FileBackend.initial(self.chain_spec, 666, start_block_height=i, base_dir=tmp_test_dir) - - entries = FileBackend.resume(self.chain_spec, base_dir=tmp_test_dir) - - self.assertEqual(len(entries), 10) - - last = -1 - for o in entries: - self.assertLess(last, o.block_height_offset) - last = o.block_height_offset - - - @unittest.skip('foo') - def test_first(self): - for i in range(1, 10): - local_uu = FileBackend.initial(self.chain_spec, 666, start_block_height=i, base_dir=tmp_test_dir) - - first_entry = FileBackend.first(self.chain_spec, base_dir=tmp_test_dir) - - self.assertEqual(first_entry.block_height_offset, 9) - - - def test_filter(self): - - self.assertEqual(len(self.o.filter), 1) - - self.o.register_filter('foo') - self.o.register_filter('bar') - - o = FileBackend(self.chain_spec, self.uu, base_dir=tmp_test_dir) - - self.assertEqual(o.filter_count, 2) - self.assertEqual(o.filter_names, ['foo', 'bar']) - self.assertEqual(len(o.filter), 1) - - self.o.complete_filter(1) - self.assertEqual(self.o.filter, b'\x40') - - self.o.complete_filter(0) - self.assertEqual(self.o.filter, b'\xc0') - - o = FileBackend(self.chain_spec, self.uu, base_dir=tmp_test_dir) - self.assertEqual(o.filter, b'\xc0') - - - with self.assertRaises(IndexError): - self.o.complete_filter(2) - - self.o.register_filter('baz') - self.o.complete_filter(2) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_helo.py b/tests/test_helo.py @@ -1,15 +0,0 @@ -# standard imports -import unittest - -# local imports -from tests.chainsyncer_base import TestBase - - -class TestHelo(TestBase): - - def test_helo(self): - pass - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_interrupt.py b/tests/test_interrupt.py @@ -1,147 +0,0 @@ -# standard imports -import logging -import unittest -import os -import tempfile - -# external imports -from chainlib.chain import ChainSpec - -# local imports -from chainsyncer.backend.memory import MemBackend -from chainsyncer.backend.sql import SQLBackend -from chainsyncer.backend.file import ( - FileBackend, - data_dir_for, - ) -from chainsyncer.error import LockError - -# test imports -from tests.chainsyncer_base import TestBase -from chainsyncer.unittest.base import ( - MockBlock, - MockConn, - TestSyncer, - ) - -logging.basicConfig(level=logging.DEBUG) -logg = logging.getLogger() - - -class NaughtyCountExceptionFilter: - - def __init__(self, name, croak_on): - self.c = 0 - self.croak = croak_on - self.name = name - - - def filter(self, conn, block, tx, db_session=None): - self.c += 1 - if self.c == self.croak: - self.croak = -1 - raise RuntimeError('foo') - - - def __str__(self): - return '{} {}'.format(self.__class__.__name__, self.name) - - -class CountFilter: - - def __init__(self, name): - self.c = 0 - self.name = name - - - def filter(self, conn, block, tx, db_session=None): - self.c += 1 - - - def __str__(self): - return '{} {}'.format(self.__class__.__name__, self.name) - - - -class TestInterrupt(TestBase): - - def setUp(self): - super(TestInterrupt, self).setUp() - - self.backend = None - self.conn = MockConn() - self.vectors = [ - [4, 3, 2], - [6, 4, 2], - [6, 5, 2], - [6, 4, 3], - ] - self.track_complete = True - - - def assert_filter_interrupt(self, vector, chain_interface): - - logg.debug('running vector {} {}'.format(str(self.backend), vector)) - - z = 0 - for v in vector: - z += v - - syncer = TestSyncer(self.backend, chain_interface, vector) - - filters = [ - CountFilter('foo'), - CountFilter('bar'), - NaughtyCountExceptionFilter('xyzzy', croak_on=3), - CountFilter('baz'), - ] - - for fltr in filters: - syncer.add_filter(fltr) - - try: - syncer.loop(0.1, self.conn) - except RuntimeError: - self.croaked = 2 - logg.info('caught croak') - pass - (pair, fltr) = self.backend.get() - self.assertGreater(fltr, 0) - - try: - syncer.loop(0.1, self.conn) - except LockError: - self.backend.complete_filter(2) - syncer.loop(0.1, self.conn) - - for fltr in filters: - logg.debug('{} {}'.format(str(fltr), fltr.c)) - self.assertEqual(fltr.c, z) - - - def test_filter_interrupt_memory(self): - self.track_complete = True - for vector in self.vectors: - self.backend = MemBackend.custom(self.chain_spec, target_block=len(vector)) - self.assert_filter_interrupt(vector, self.interface) - - #TODO: implement flag lock in file backend - @unittest.expectedFailure - def test_filter_interrupt_file(self): - #for vector in self.vectors: - vector = self.vectors.pop() - d = tempfile.mkdtemp() - #os.makedirs(data_dir_for(self.chain_spec, 'foo', d)) - self.backend = FileBackend.initial(self.chain_spec, len(vector), base_dir=d) #'foo', base_dir=d) - self.assert_filter_interrupt(vector, self.interface) - - - def test_filter_interrupt_sql(self): - self.track_complete = True - for vector in self.vectors: - self.backend = SQLBackend.initial(self.chain_spec, len(vector)) - self.assert_filter_interrupt(vector, self.interface) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_mem.py b/tests/test_mem.py @@ -1,32 +0,0 @@ -# standard imports -import unittest -import logging - -# external imports -from chainlib.chain import ChainSpec - -# local imports -from chainsyncer.backend.memory import MemBackend - -# testutil imports -from tests.chainsyncer_base import TestBase - -logging.basicConfig(level=logging.DEBUG) - - -class TestMem(TestBase): - - def test_backend_mem_custom(self): - chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo') - flags = int(5).to_bytes(2, 'big') - flag_count = 10 - flags_target = (2 ** 10) - 1 - backend = MemBackend.custom(chain_spec, 666, 42, 2, flags, flag_count, object_id='xyzzy') - self.assertEqual(((42, 2), flags), backend.start()) - self.assertEqual(((42, 2), flags), backend.get()) - self.assertEqual((666, flags_target), backend.target()) - self.assertEqual(backend.object_id, 'xyzzy') - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_thread.py b/tests/test_thread.py @@ -1,12 +0,0 @@ -# standard imports -import logging -import unittest - -# test imports -from tests.chainsyncer_base import TestBase - - -class TestThreadRange(TestBase): - - def test_hello(self): - ThreadPoolRangeHistorySyncer(None, 3) diff --git a/tests/test_thread_range.py b/tests/test_thread_range.py @@ -1,114 +0,0 @@ -# standard imports -import unittest -import logging - -# external imports -from chainlib.chain import ChainSpec -from chainlib.eth.unittest.ethtester import EthTesterCase -from chainlib.eth.nonce import RPCNonceOracle -from chainlib.eth.gas import ( - RPCGasOracle, - Gas, - ) -from chainlib.eth.unittest.base import TestRPCConnection - -# local imports -from chainsyncer.backend.memory import MemBackend -from chainsyncer.driver.threadrange import ( - sync_split, - ThreadPoolRangeHistorySyncer, - ) -from chainsyncer.unittest.base import MockConn -from chainsyncer.unittest.db import ChainSyncerDb - -# testutil imports -from tests.chainsyncer_base import ( - EthChainInterface, - ) - -logging.basicConfig(level=logging.DEBUG) -logg = logging.getLogger() - - -class SyncerCounter: - - def __init__(self): - self.hits = [] - - - def filter(self, conn, block, tx, db_session=None): - logg.debug('fltr {} {}'.format(block, tx)) - self.hits.append((block, tx)) - - -class TestBaseEth(EthTesterCase): - - interface = EthChainInterface() - - def setUp(self): - super(TestBaseEth, self).setUp() - self.db = ChainSyncerDb() - self.session = self.db.bind_session() - - def tearDown(self): - self.session.commit() - self.db.release_session(self.session) - #os.unlink(self.db_path) - - -class TestThreadRange(TestBaseEth): - - interface = EthChainInterface() - - def test_range_split_even(self): - ranges = sync_split(5, 20, 3) - self.assertEqual(len(ranges), 3) - self.assertEqual(ranges[0], (5, 9)) - self.assertEqual(ranges[1], (10, 14)) - self.assertEqual(ranges[2], (15, 19)) - - - def test_range_split_underflow(self): - ranges = sync_split(5, 8, 4) - self.assertEqual(len(ranges), 3) - self.assertEqual(ranges[0], (5, 5)) - self.assertEqual(ranges[1], (6, 6)) - self.assertEqual(ranges[2], (7, 7)) - - - def test_range_syncer_hello(self): - #chain_spec = ChainSpec('evm', 'bloxberg', 8996, 'foo') - chain_spec = ChainSpec('evm', 'foochain', 42) - backend = MemBackend.custom(chain_spec, 20, 5, 3, 5, 10) - #syncer = ThreadPoolRangeHistorySyncer(MockConn, 3, backend, self.interface) - syncer = ThreadPoolRangeHistorySyncer(3, backend, self.interface) - syncer.loop(0.1, None) - - - def test_range_syncer_content(self): - nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc) - gas_oracle = RPCGasOracle(self.rpc) - - self.backend.mine_blocks(10) - - c = Gas(signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_spec=self.chain_spec) - (tx_hash, o) = c.create(self.accounts[0], self.accounts[1], 1024) - r = self.rpc.do(o) - - self.backend.mine_blocks(3) - - c = Gas(signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_spec=self.chain_spec) - (tx_hash, o) = c.create(self.accounts[0], self.accounts[1], 2048) - r = self.rpc.do(o) - - self.backend.mine_blocks(10) - - backend = MemBackend.custom(self.chain_spec, 20, 5, 3, 5, 10) - syncer = ThreadPoolRangeHistorySyncer(3, backend, self.interface) - fltr = SyncerCounter() - syncer.add_filter(fltr) - syncer.loop(0.1, None) - - -if __name__ == '__main__': - unittest.main()