commit d27bcaa9f5142eea9ef1685f8e3a25dca878fced
parent 197560ae16bf26454a35dd29b624ef9615365d99
Author: lash <dev@holbrook.no>
Date: Wed, 20 Apr 2022 15:15:43 +0000
Factor out common store tests, implement for fs and rocksdb
Diffstat:
5 files changed, 300 insertions(+), 311 deletions(-)
diff --git a/chainsyncer/store/rocksdb.py b/chainsyncer/store/rocksdb.py
@@ -36,17 +36,17 @@ class SyncRocksDbStore(SyncStore):
def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None):
super(SyncRocksDbStore, self).__init__(base_path, session_id=session_id)
- factory = RocksDbStoreFactory(self.session_path, binary=True)
- prefix_factory = RocksDbStoreAdder(factory, 'sync')
+ self.factory = RocksDbStoreFactory(self.session_path, binary=True)
+ prefix_factory = RocksDbStoreAdder(self.factory, 'sync')
self.setup_sync_state(prefix_factory, state_event_callback)
- prefix_factory = RocksDbStoreAdder(factory, 'filter')
+ prefix_factory = RocksDbStoreAdder(self.factory, 'filter')
self.setup_filter_state(prefix_factory, filter_state_event_callback)
self.session_id = os.path.basename(self.session_path)
logg.info('session id {} resolved {} path {}'.format(session_id, self.session_id, self.session_path))
- self.target_db = RocksDbStoreAdder(factory, '.stat').add('target')
+ self.target_db = RocksDbStoreAdder(self.factory, '.stat').add('target')
def get_target(self):
@@ -55,3 +55,8 @@ class SyncRocksDbStore(SyncStore):
def set_target(self, v):
self.target_db.put('target')
+
+
+ def disconnect(self):
+ super(SyncRocksDbStore, self).disconnect()
+ self.factory.close()
diff --git a/chainsyncer/unittest/db.py b/chainsyncer/unittest/db.py
@@ -1,64 +0,0 @@
-# standard imports
-import logging
-import os
-
-# external imports
-import alembic
-import alembic.config
-
-# local imports
-from chainsyncer.db.models.base import SessionBase
-from chainsyncer.db import dsn_from_config
-from chainsyncer.db.models.base import SessionBase
-
-logg = logging.getLogger(__name__)
-
-
-class ChainSyncerDb:
- """SQLITE database setup for unit tests
-
- :param debug: Activate sql level debug (outputs sql statements)
- :type debug: bool
- """
-
- base = SessionBase
-
- def __init__(self, debug=False):
- config = {
- 'DATABASE_ENGINE': 'sqlite',
- 'DATABASE_DRIVER': 'pysqlite',
- 'DATABASE_NAME': 'chainsyncer.sqlite',
- }
- logg.debug('config {}'.format(config))
-
- self.dsn = dsn_from_config(config)
-
- self.base.poolable = False
- self.base.transactional = False
- self.base.procedural = False
- self.base.connect(self.dsn, debug=debug) # TODO: evaluates to "true" even if string is 0
-
- rootdir = os.path.join(os.path.dirname(os.path.dirname(__file__)), '..')
- dbdir = os.path.join(rootdir, 'chainsyncer', 'db')
- #migrationsdir = os.path.join(dbdir, 'migrations', config.get('DATABASE_ENGINE'))
- migrationsdir = os.path.join(dbdir, 'migrations', 'default')
- logg.info('using migrations directory {}'.format(migrationsdir))
-
- ac = alembic.config.Config(os.path.join(migrationsdir, 'alembic.ini'))
- ac.set_main_option('sqlalchemy.url', self.dsn)
- ac.set_main_option('script_location', migrationsdir)
-
- alembic.command.downgrade(ac, 'base')
- alembic.command.upgrade(ac, 'head')
-
-
- def bind_session(self, session=None):
- """Create session using underlying session base
- """
- return self.base.bind_session(session)
-
-
- def release_session(self, session=None):
- """Release session using underlying session base
- """
- return self.base.release_session(session)
diff --git a/chainsyncer/unittest/store.py b/chainsyncer/unittest/store.py
@@ -0,0 +1,267 @@
+# standard imports
+import os
+import stat
+import unittest
+import shutil
+import tempfile
+import logging
+
+# local imports
+from chainsyncer.session import SyncSession
+from chainsyncer.error import (
+ LockError,
+ FilterDone,
+ IncompleteFilterError,
+ SyncDone,
+ )
+from chainsyncer.unittest import MockFilter
+
+logg = logging.getLogger(__name__)
+
+
+class TestStoreBase(unittest.TestCase):
+
+ def setUp(self):
+ self.path = tempfile.mkdtemp()
+ self.store_factory = None
+
+
+ @classmethod
+ def link(cls, target):
+ for v in [
+ "default",
+ "store_start",
+ "store_resume",
+ "sync_process_nofilter",
+ "sync_process_onefilter",
+ "sync_process_outoforder",
+ "sync_process_interrupt",
+ "sync_process_reset",
+ "sync_process_done",
+ "sync_head_future",
+ "sync_history_interrupted",
+ "sync_history_complete",
+ ]:
+ setattr(target, 'test_' + v, getattr(cls, 't_' + v))
+
+
+ def tearDown(self):
+ shutil.rmtree(self.path)
+
+
+ def t_default(self):
+ store = self.store_factory()
+
+ fp = os.path.join(self.path, store.session_id)
+ session_id = store.session_id
+ st = os.stat(fp)
+ self.assertTrue(stat.S_ISDIR(st.st_mode))
+ self.assertTrue(store.is_default)
+
+ fpd = os.path.join(self.path, 'default')
+ st = os.stat(fpd)
+ self.assertTrue(stat.S_ISDIR(st.st_mode))
+ self.assertTrue(store.is_default)
+
+ fpd = os.path.realpath(fpd)
+ self.assertEqual(fpd, fp)
+
+ store.disconnect()
+ store = self.store_factory()
+ fpr = os.path.join(self.path, session_id)
+ self.assertEqual(fp, fpr)
+ self.assertTrue(store.is_default)
+
+ store.disconnect()
+ store = self.store_factory('default')
+ fpr = os.path.join(self.path, session_id)
+ self.assertEqual(fp, fpr)
+ self.assertTrue(store.is_default)
+
+ store.disconnect()
+ store = self.store_factory('foo')
+ fpf = os.path.join(self.path, 'foo')
+ st = os.stat(fpf)
+ self.assertTrue(stat.S_ISDIR(st.st_mode))
+ self.assertFalse(store.is_default)
+
+
+ def t_store_start(self):
+ store = self.store_factory()
+ store.start(42)
+ self.assertTrue(store.first)
+
+ store.disconnect()
+ store = self.store_factory()
+ store.start()
+ self.assertFalse(store.first)
+
+
+ def t_store_resume(self):
+ store = self.store_factory()
+ store.start(13)
+ self.assertTrue(store.first)
+ # todo not done
+
+
+ def t_sync_process_nofilter(self):
+ store = self.store_factory()
+ session = SyncSession(store)
+ session.start()
+ o = session.get(0)
+ with self.assertRaises(FilterDone):
+ o.advance()
+
+
+ def t_sync_process_onefilter(self):
+ store = self.store_factory()
+ session = SyncSession(store)
+
+ fltr_one = MockFilter('foo')
+ store.register(fltr_one)
+
+ session.start()
+ o = session.get(0)
+ o.advance()
+ o.release()
+
+
+ def t_sync_process_outoforder(self):
+ store = self.store_factory()
+ session = SyncSession(store)
+
+ fltr_one = MockFilter('foo')
+ store.register(fltr_one)
+ fltr_two = MockFilter('two')
+ store.register(fltr_two)
+
+ session.start()
+ o = session.get(0)
+ o.advance()
+ with self.assertRaises(LockError):
+ o.advance()
+
+ o.release()
+ with self.assertRaises(LockError):
+ o.release()
+
+ o.advance()
+ o.release()
+
+
+ def t_sync_process_interrupt(self):
+ store = self.store_factory()
+ session = SyncSession(store)
+
+ fltr_one = MockFilter('foo')
+ store.register(fltr_one)
+ fltr_two = MockFilter('bar')
+ store.register(fltr_two)
+
+ session.start()
+ o = session.get(0)
+ o.advance()
+ o.release(interrupt=True)
+ with self.assertRaises(FilterDone):
+ o.advance()
+
+
+ def t_sync_process_reset(self):
+ store = self.store_factory()
+ session = SyncSession(store)
+
+ fltr_one = MockFilter('foo')
+ store.register(fltr_one)
+ fltr_two = MockFilter('bar')
+ store.register(fltr_two)
+
+ session.start()
+ o = session.get(0)
+ o.advance()
+ with self.assertRaises(LockError):
+ o.reset()
+ o.release()
+ with self.assertRaises(IncompleteFilterError):
+ o.reset()
+
+ o.advance()
+ o.release()
+
+ with self.assertRaises(FilterDone):
+ o.advance()
+
+ o.reset()
+
+
+ def t_sync_process_done(self):
+ store = self.store_factory()
+ session = SyncSession(store)
+
+ fltr_one = MockFilter('foo')
+ store.register(fltr_one)
+
+ session.start(target=0)
+ o = session.get(0)
+ o.advance()
+ o.release()
+ with self.assertRaises(FilterDone):
+ o.advance()
+ o.reset()
+ with self.assertRaises(SyncDone):
+ o.next(advance_block=True)
+
+
+ def t_sync_head_future(self):
+ store = self.store_factory('foo')
+ session = SyncSession(store)
+
+ session.start()
+ o = session.get(0)
+ o.next(advance_block=True)
+ o.next(advance_block=True)
+ session.stop(o)
+
+ store.disconnect()
+ store = self.store_factory('foo')
+ store.start()
+ o = store.get(2)
+
+
+ def t_sync_history_interrupted(self):
+ store = self.store_factory('foo')
+ session = SyncSession(store)
+
+ session.start(target=13)
+ o = session.get(0)
+ o.next(advance_block=True)
+ o.next(advance_block=True)
+ session.stop(o)
+
+ store.disconnect()
+ store = self.store_factory('foo')
+ store.start()
+ o = store.get(0)
+ self.assertEqual(o.cursor, 2)
+ self.assertEqual(o.target, 13)
+ o.next(advance_block=True)
+ o.next(advance_block=True)
+
+ session.stop(o)
+ store.disconnect()
+ store = self.store_factory('foo')
+ store.start()
+ self.assertEqual(o.cursor, 4)
+ self.assertEqual(o.target, 13)
+
+
+ def t_sync_history_complete(self):
+ store = self.store_factory('foo')
+ session = SyncSession(store)
+
+ session.start(target=3)
+ o = session.get(0)
+ o.next(advance_block=True)
+ o.next(advance_block=True)
+ o.next(advance_block=True)
+ with self.assertRaises(SyncDone):
+ o.next(advance_block=True)
diff --git a/tests/test_fs.py b/tests/test_fs.py
@@ -1,245 +1,32 @@
# standard imports
import unittest
-import tempfile
-import shutil
import logging
-import stat
-import os
# local imports
from chainsyncer.store.fs import SyncFsStore
-from chainsyncer.session import SyncSession
-from chainsyncer.error import (
- LockError,
- FilterDone,
- IncompleteFilterError,
- SyncDone,
- )
-from chainsyncer.unittest import MockFilter
+from chainsyncer.unittest.store import TestStoreBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
-class TestFs(unittest.TestCase):
+class StoreFactory:
- def setUp(self):
- self.path = tempfile.mkdtemp()
-
-
- def tearDown(self):
- shutil.rmtree(self.path)
-
-
- def test_default(self):
- store = SyncFsStore(self.path)
- fp = os.path.join(self.path, store.session_id)
- session_id = store.session_id
- st = os.stat(fp)
- self.assertTrue(stat.S_ISDIR(st.st_mode))
- self.assertTrue(store.is_default)
-
- fpd = os.path.join(self.path, 'default')
- st = os.stat(fpd)
- self.assertTrue(stat.S_ISDIR(st.st_mode))
- self.assertTrue(store.is_default)
-
- fpd = os.path.realpath(fpd)
- self.assertEqual(fpd, fp)
-
- store = SyncFsStore(self.path)
- fpr = os.path.join(self.path, session_id)
- self.assertEqual(fp, fpr)
- self.assertTrue(store.is_default)
-
- store = SyncFsStore(self.path, 'default')
- fpr = os.path.join(self.path, session_id)
- self.assertEqual(fp, fpr)
- self.assertTrue(store.is_default)
-
- store = SyncFsStore(self.path, 'foo')
- fpf = os.path.join(self.path, 'foo')
- st = os.stat(fpf)
- self.assertTrue(stat.S_ISDIR(st.st_mode))
- self.assertFalse(store.is_default)
-
-
- def test_store_start(self):
- store = SyncFsStore(self.path)
- store.start(42)
- self.assertTrue(store.first)
-
- store = SyncFsStore(self.path)
- store.start()
- self.assertFalse(store.first)
-
-
- def test_store_resume(self):
- store = SyncFsStore(self.path)
- store.start(13)
- self.assertTrue(store.first)
- # todo not done
-
-
- def test_sync_process_nofilter(self):
- store = SyncFsStore(self.path)
- session = SyncSession(store)
- session.start()
- o = session.get(0)
- with self.assertRaises(FilterDone):
- o.advance()
-
-
- def test_sync_process_onefilter(self):
- store = SyncFsStore(self.path)
- session = SyncSession(store)
-
- fltr_one = MockFilter('foo')
- store.register(fltr_one)
-
- session.start()
- o = session.get(0)
- o.advance()
- o.release()
-
-
- def test_sync_process_outoforder(self):
- store = SyncFsStore(self.path)
- session = SyncSession(store)
-
- fltr_one = MockFilter('foo')
- store.register(fltr_one)
- fltr_two = MockFilter('two')
- store.register(fltr_two)
-
- session.start()
- o = session.get(0)
- o.advance()
- with self.assertRaises(LockError):
- o.advance()
+ def __init__(self, path):
+ self.path = path
- o.release()
- with self.assertRaises(LockError):
- o.release()
- o.advance()
- o.release()
+ def create(self, session_id=None):
+ return SyncFsStore(self.path, session_id=session_id)
- def test_sync_process_interrupt(self):
- store = SyncFsStore(self.path)
- session = SyncSession(store)
+class TestFs(TestStoreBase):
- fltr_one = MockFilter('foo')
- store.register(fltr_one)
- fltr_two = MockFilter('bar')
- store.register(fltr_two)
-
- session.start()
- o = session.get(0)
- o.advance()
- o.release(interrupt=True)
- with self.assertRaises(FilterDone):
- o.advance()
-
-
- def test_sync_process_reset(self):
- store = SyncFsStore(self.path)
- session = SyncSession(store)
-
- fltr_one = MockFilter('foo')
- store.register(fltr_one)
- fltr_two = MockFilter('bar')
- store.register(fltr_two)
-
- session.start()
- o = session.get(0)
- o.advance()
- with self.assertRaises(LockError):
- o.reset()
- o.release()
- with self.assertRaises(IncompleteFilterError):
- o.reset()
-
- o.advance()
- o.release()
-
- with self.assertRaises(FilterDone):
- o.advance()
-
- o.reset()
-
-
- def test_sync_process_done(self):
- store = SyncFsStore(self.path)
- session = SyncSession(store)
-
- fltr_one = MockFilter('foo')
- store.register(fltr_one)
-
- session.start(target=0)
- o = session.get(0)
- o.advance()
- o.release()
- with self.assertRaises(FilterDone):
- o.advance()
- o.reset()
- with self.assertRaises(SyncDone):
- o.next(advance_block=True)
-
-
- def test_sync_head_future(self):
- store = SyncFsStore(self.path, session_id='foo')
- session = SyncSession(store)
-
- session.start()
- o = session.get(0)
- o.next(advance_block=True)
- o.next(advance_block=True)
- session.stop(o)
-
- store = SyncFsStore(self.path, session_id='foo')
- store.start()
- o = store.get(2)
-
-
- def test_sync_history_interrupted(self):
- store = SyncFsStore(self.path, session_id='foo')
- session = SyncSession(store)
-
- session.start(target=13)
- o = session.get(0)
- o.next(advance_block=True)
- o.next(advance_block=True)
- session.stop(o)
-
- store = SyncFsStore(self.path, session_id='foo')
- store.start()
- o = store.get(0)
- self.assertEqual(o.cursor, 2)
- self.assertEqual(o.target, 13)
- o.next(advance_block=True)
- o.next(advance_block=True)
-
- session.stop(o)
- store = SyncFsStore(self.path, session_id='foo')
- store.start()
- self.assertEqual(o.cursor, 4)
- self.assertEqual(o.target, 13)
-
-
- def test_sync_history_complete(self):
- store = SyncFsStore(self.path, session_id='foo')
- session = SyncSession(store)
-
- session.start(target=3)
- o = session.get(0)
- o.next(advance_block=True)
- o.next(advance_block=True)
- o.next(advance_block=True)
- with self.assertRaises(SyncDone):
- o.next(advance_block=True)
+ def setUp(self):
+ super(TestFs, self).setUp()
+ self.store_factory = StoreFactory(self.path).create
if __name__ == '__main__':
+ TestStoreBase.link(TestFs)
unittest.main()
diff --git a/tests/test_rocksdb.py b/tests/test_rocksdb.py
@@ -1,37 +1,31 @@
# standard imports
import unittest
-import tempfile
-import shutil
import logging
-import stat
-import os
# local imports
from chainsyncer.store.rocksdb import SyncRocksDbStore
-from chainsyncer.session import SyncSession
-from chainsyncer.error import (
- LockError,
- FilterDone,
- IncompleteFilterError,
- SyncDone,
- )
-from chainsyncer.unittest import MockFilter
+from chainsyncer.unittest.store import TestStoreBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
+class StoreFactory:
-class TestFs(unittest.TestCase):
+ def __init__(self, path):
+ self.path = path
- def setUp(self):
- self.path = tempfile.mkdtemp()
+
+ def create(self, session_id=None):
+ return SyncRocksDbStore(self.path, session_id=session_id)
- def test_default(self):
- store = SyncRocksDbStore(self.path)
- store.start(42)
- self.assertTrue(store.first)
+class TestRocksDb(TestStoreBase):
+
+ def setUp(self):
+ super(TestRocksDb, self).setUp()
+ self.store_factory = StoreFactory(self.path).create
if __name__ == '__main__':
+ TestStoreBase.link(TestRocksDb)
unittest.main()