commit e02b9b5d7e202bb10fbd195dce736784f21d50d8
parent 1634307cc181cd25df78295e3e78b964c97f371f
Author: lash <dev@holbrook.no>
Date: Mon, 14 Aug 2023 09:43:53 +0100
Fix filter state deletion on partial filter execution interrupt
Diffstat:
7 files changed, 129 insertions(+), 8 deletions(-)
diff --git a/CHANGELOG b/CHANGELOG
@@ -1,3 +1,8 @@
+* 0.8.5
+ - Upgrade shep to eliminate state deletion after interrupted partial filter list execution
+ - Add block/tx break points for filter breaks in unittest filter mocker
+* 0.8.4
+ - Add block and tx processing log entries at info level
* 0.8.3
- Apply dialect on receipts and block tx constructors
- Avoid filter state crash when no filters defined
diff --git a/chainsyncer/store/base.py b/chainsyncer/store/base.py
@@ -193,6 +193,7 @@ class SyncStore:
def register(self, fltr):
+ logg.info('register filter {}'.format(fltr))
self.filters.append(fltr)
self.filter_state.register(fltr)
@@ -269,7 +270,7 @@ class SyncStore:
v = self.get_target()
if v != None:
target = v
-
+
if len(thresholds) == 0:
if self.target != None:
logg.warning('sync "{}" is already done, nothing to do'.format(self.session_id))
diff --git a/chainsyncer/unittest/base.py b/chainsyncer/unittest/base.py
@@ -139,7 +139,7 @@ class MockStore(State):
class MockFilter(SyncFilter):
- def __init__(self, name, brk=None, brk_hard=None, z=None):
+ def __init__(self, name, brk=None, brk_hard=None, brk_points=[], z=None):
self.name = name
if z == None:
h = hashlib.sha256()
@@ -148,6 +148,7 @@ class MockFilter(SyncFilter):
self.z = z
self.brk = brk
self.brk_hard = brk_hard
+ self.brk_points = brk_points
self.contents = []
@@ -159,8 +160,14 @@ class MockFilter(SyncFilter):
return self.name
- def filter(self, conn, block, tx, ctx=None):
+ def brkchk(self, block, tx):
r = False
+ if len(self.brk_points) > 0:
+ if block.number == self.brk_points[0][0] and tx.index == self.brk_points[0][1]:
+ logg.debug('break point hit: {}'.format(self.brk_points[0]))
+ self.brk_points.pop(0)
+ else:
+ return r
if self.brk_hard != None:
r = True
if self.brk_hard > 0:
@@ -172,6 +179,11 @@ class MockFilter(SyncFilter):
if self.brk > 0:
r = True
self.brk -= 1
+ return r
+
+
+ def filter(self, conn, block, tx, ctx=None):
+ r = self.brkchk(block, tx)
self.contents.append((block.number, tx.index, tx.hash,))
logg.debug('filter {} result {} block {} tx {} {}'.format(self.common_name(), r, block.number, tx.index, tx.hash))
return r
@@ -212,8 +224,9 @@ class MockDriver(SyncDriver):
if self.interrupt[0] == block.number and self.interrupt[1] == i:
logg.info('interrupt triggered at {}'.format(self.interrupt))
if self.interrupt_global:
- SyncDriver.running_global = False
- self.running = False
+ self.terminate()
+ else:
+ self.running = False
break
tx = block.tx(i)
self.process_single(conn, block, tx)
diff --git a/requirements.txt b/requirements.txt
@@ -2,4 +2,4 @@ confini~=0.6.1
semver==2.13.0
hexathon~=0.1.7
chainlib~=0.5.2
-shep~=0.3.0
+shep~=0.3.4
diff --git a/setup.cfg b/setup.cfg
@@ -1,6 +1,6 @@
[metadata]
name = chainsyncer
-version = 0.8.3
+version = 0.8.5
description = Generic blockchain syncer driver
author = Louis Holbrook
author_email = dev@holbrook.no
diff --git a/tests/test_filter.py b/tests/test_filter.py
@@ -351,6 +351,60 @@ class TestFilter(unittest.TestCase):
session.filter(self.conn, block, tx)
+ # harness testing
+ def test_filter_breakpoint(self):
+ fltr_one = MockFilter('foo', brk=3, brk_points=[(2, 3,), (3, 0,)])
+ self.store.register(fltr_one)
+ fltr_two = MockFilter('bar')
+ self.store.register(fltr_two)
+
+ self.session.start()
+
+ tx_hash = os.urandom(32).hex()
+ tx = MockTx(2, tx_hash)
+ block = MockBlock(1, [tx_hash])
+ self.session.filter(self.conn, block, tx)
+
+ self.assertEqual(len(fltr_one.contents), 1)
+ self.assertEqual(len(fltr_two.contents), 1)
+
+ tx_hash = os.urandom(32).hex()
+ tx = MockTx(2, tx_hash)
+ block = MockBlock(2, [tx_hash])
+ self.session.filter(self.conn, block, tx)
+
+ self.assertEqual(len(fltr_one.contents), 2)
+ self.assertEqual(len(fltr_two.contents), 2)
+
+ tx_hash = os.urandom(32).hex()
+ tx = MockTx(3, tx_hash)
+ block = MockBlock(2, [tx_hash])
+ self.session.filter(self.conn, block, tx)
+ self.assertEqual(len(fltr_one.contents), 3)
+ self.assertEqual(len(fltr_two.contents), 2)
+
+ tx_hash = os.urandom(32).hex()
+ tx = MockTx(0, tx_hash)
+ block = MockBlock(3, [tx_hash])
+ self.session.filter(self.conn, block, tx)
+ self.assertEqual(len(fltr_one.contents), 4)
+ self.assertEqual(len(fltr_two.contents), 2)
+
+ tx_hash = os.urandom(32).hex()
+ tx = MockTx(2, tx_hash)
+ block = MockBlock(4, [tx_hash])
+ self.session.filter(self.conn, block, tx)
+ self.assertEqual(len(fltr_one.contents), 5)
+ self.assertEqual(len(fltr_two.contents), 2)
+
+ tx_hash = os.urandom(32).hex()
+ tx = MockTx(3, tx_hash)
+ block = MockBlock(4, [tx_hash])
+ self.session.filter(self.conn, block, tx)
+ self.assertEqual(len(fltr_one.contents), 6)
+ self.assertEqual(len(fltr_two.contents), 3)
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/tests/test_session.py b/tests/test_session.py
@@ -151,7 +151,32 @@ class TestFilter(unittest.TestCase):
self.assertEqual(len(fltr_one.contents), 6)
- def test_resume_nofilter(self):
+ def test_driver_open_interrupt_sync_multifilter(self):
+ drv = MockDriver(self.store, interrupt_block=2, target=-1)
+ generator = MockBlockGenerator()
+ generator.generate([3, 1, 2], driver=drv)
+
+ fltr_one = MockFilter('foo_bar')
+ self.store.register(fltr_one)
+
+ fltr_two = MockFilter('bar_baz')
+ self.store.register(fltr_two)
+
+ drv.run(self.conn, interval=0.1)
+
+ logg.info('resume')
+ store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
+ store.register(fltr_one)
+ store.register(fltr_two)
+
+ drv = MockDriver(store, interrupt_block=4, target=-1)
+ generator = MockBlockGenerator(offset=2)
+ generator.generate([3, 1, 2, 5, 3], driver=drv)
+
+ drv.run(self.conn, interval=0.1)
+
+
+ def test_driver_resume_nofilter(self):
#drv = MockDriver(self.store, interrupt_block=7, target=10)
drv = MockDriver(self.store, target=2)
generator = MockBlockGenerator()
@@ -165,5 +190,28 @@ class TestFilter(unittest.TestCase):
drv.run(self.conn, interval=0.1)
+ def test_driver_coldresume_interrupt(self):
+ drv = MockDriver(self.store, interrupt_block=2, interrupt_global=True, target=-1)
+ generator = MockBlockGenerator()
+ generator.generate([3, 1, 2, 4], driver=drv)
+
+ fltr_one = MockFilter('foo', brk=10) # will break on all "foo" filter invocations
+ self.store.register(fltr_one)
+ fltr_two = MockFilter('bar')
+ self.store.register(fltr_two)
+
+ drv.run(self.conn, interval=0.1)
+
+ logg.info('resume')
+ SyncDriver.running_global = True
+ store = SyncFsStore(self.path, state_event_callback=state_event_handler, filter_state_event_callback=filter_state_event_handler)
+ store.register(fltr_one)
+ store.register(fltr_two)
+ drv = MockDriver(store, interrupt_block=4, target=-1)
+ generator = MockBlockGenerator(offset=2)
+ generator.generate([4, 1, 2], driver=drv)
+ drv.run(self.conn, interval=0.1)
+
+
if __name__ == '__main__':
unittest.main()