commit 891f90ae5f1a2cd38998fdbd6a7e7b5e3791fa35
parent f521162e7bef8745587b0f06d48bb33bc17f5f81
Author: lash <dev@holbrook.no>
Date: Wed, 20 Apr 2022 11:59:12 +0000
WIP refactor to allow other backends
Diffstat:
6 files changed, 232 insertions(+), 215 deletions(-)
diff --git a/chainsyncer/state/base.py b/chainsyncer/state/base.py
@@ -7,7 +7,7 @@ import os
logg = logging.getLogger(__name__)
-re_processedname = r'^_?[A-Z,_]*$'
+re_processedname = r'^_?[A-Z,\.]*$'
# TODO: properly clarify interface shared with syncfsstore, move to filter module?
class SyncState:
@@ -74,7 +74,6 @@ class SyncState:
self.__syncs[v] = True
if self.scan_path != None:
for v in os.listdir(self.scan_path):
- logg.debug('sync {} try {}'.format(self.scan_path, v))
if re.match(re_processedname, v):
k = None
try:
diff --git a/chainsyncer/store/__init__.py b/chainsyncer/store/__init__.py
@@ -0,0 +1 @@
+from .base import *
diff --git a/chainsyncer/store/base.py b/chainsyncer/store/base.py
@@ -0,0 +1,217 @@
+# standard imports
+import logging
+
+# local imports
+from shep.error import StateInvalid
+from chainsyncer.error import (
+ LockError,
+ FilterDone,
+ InterruptError,
+ IncompleteFilterError,
+ SyncDone,
+ )
+
+logg = logging.getLogger(__name__)
+
+
+def sync_state_serialize(block_height, tx_index, block_target):
+ b = block_height.to_bytes(4, 'big')
+ b += tx_index.to_bytes(4, 'big')
+ b += block_target.to_bytes(4, 'big', signed=True)
+ return b
+
+
+def sync_state_deserialize(b):
+ block_height = int.from_bytes(b[:4], 'big')
+ tx_index = int.from_bytes(b[4:8], 'big')
+ block_target = int.from_bytes(b[8:], 'big', signed=True)
+ return (block_height, tx_index, block_target,)
+
+
+# NOT thread safe
+class SyncItem:
+
+ def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_invalid=False):
+ self.offset = offset
+ self.target = target
+ self.sync_state = sync_state
+ self.filter_state = filter_state
+ self.state_key = str(offset)
+
+ logg.debug('get key {}'.format(self.state_key))
+ v = self.sync_state.get(self.state_key)
+
+ (self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v)
+
+ if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') and not ignore_invalid:
+ raise LockError(self.state_key)
+
+ self.count = len(self.filter_state.all(pure=True)) - 4
+ self.skip_filter = False
+ if self.count == 0:
+ self.skip_filter = True
+ elif not started:
+ self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
+
+
+ def __check_done(self):
+ if self.filter_state.state(self.state_key) & self.filter_state.from_name('INTERRUPT') > 0:
+ raise InterruptError(self.state_key)
+ if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') > 0:
+ raise FilterDone(self.state_key)
+
+
+ def reset(self):
+ if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
+ raise LockError('reset attempt on {} when state locked'.format(self.state_key))
+ if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0:
+ raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key))
+ self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
+
+
+ def next(self, advance_block=False):
+ v = self.sync_state.state(self.state_key)
+ if v == self.sync_state.DONE:
+ raise SyncDone(self.target)
+ elif v == self.sync_state.NEW:
+ self.sync_state.next(self.state_key)
+
+ v = self.sync_state.get(self.state_key)
+ (block_number, tx_index, target) = sync_state_deserialize(v)
+ if advance_block:
+ block_number += 1
+ tx_index = 0
+ if self.target >= 0 and block_number > self.target:
+ self.sync_state.move(self.state_key, self.sync_state.DONE)
+ raise SyncDone(self.target)
+ else:
+ tx_index += 1
+
+ self.cursor = block_number
+ self.tx_cursor = tx_index
+
+ b = sync_state_serialize(block_number, tx_index, target)
+ self.sync_state.replace(self.state_key, b)
+
+
+ def __find_advance(self):
+ v = self.filter_state.state(self.state_key)
+
+
+ def advance(self):
+ if self.skip_filter:
+ raise FilterDone()
+ self.__check_done()
+
+ if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
+ raise LockError('advance attempt on {} when state locked'.format(self.state_key))
+ done = False
+ try:
+ self.filter_state.next(self.state_key)
+ except StateInvalid:
+ done = True
+ if done:
+ raise FilterDone()
+ self.filter_state.set(self.state_key, self.filter_state.from_name('LOCK'))
+
+
+ def release(self, interrupt=False):
+ if self.skip_filter:
+ return False
+ if interrupt:
+ self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
+ self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT'))
+ self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
+ return False
+
+ state = self.filter_state.state(self.state_key)
+ if state & self.filter_state.from_name('LOCK') == 0:
+ raise LockError('release attempt on {} when state unlocked'.format(self.state_key))
+ self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
+ try:
+ c = self.filter_state.peek(self.state_key)
+ logg.debug('peeked {}'.format(c))
+ except StateInvalid:
+ self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
+ return False
+ return True
+
+
+ def __str__(self):
+ return 'syncitem offset {} target {} cursor {}'.format(self.offset, self.target, self.cursor)
+
+
+class SyncStore:
+
+ def __init__(self, session_id=None):
+ self.session_id = None
+ self.session_path = None
+ self.is_default = False
+ self.first = False
+ self.target = None
+ self.items = {}
+ self.item_keys = []
+ self.started = False
+
+
+ def register(self, fltr):
+ self.filters.append(fltr)
+ self.filter_state.register(fltr)
+
+
+ def start(self, offset=0, target=-1):
+ if self.started:
+ return
+
+ self.load(target)
+
+ if self.first:
+ state_bytes = sync_state_serialize(offset, 0, target)
+ block_number_str = str(offset)
+ self.state.put(block_number_str, state_bytes)
+ self.filter_state.put(block_number_str)
+ o = SyncItem(offset, target, self.state, self.filter_state)
+ self.items[offset] = o
+ self.item_keys.append(offset)
+ elif offset > 0:
+ logg.warning('block number argument {} for start ignored for already initiated sync {}'.format(offset, self.session_id))
+ self.started = True
+
+ self.item_keys.sort()
+
+
+ def stop(self, item):
+ if item.target == -1:
+ state_bytes = sync_state_serialize(item.cursor, 0, item.cursor)
+ self.state.replace(str(item.offset), state_bytes)
+ self.filter_state.put(str(item.cursor))
+
+ SyncItem(item.offset, -1, self.state, self.filter_state)
+ logg.info('New sync state start at block number {} for next head sync backfill'.format(item.cursor))
+
+ self.state.move(item.state_key, self.state.DONE)
+
+ state_bytes = sync_state_serialize(item.cursor, 0, -1)
+ self.state.put(str(item.cursor), state_bytes)
+
+ logg.debug('item {}'.format(self.state.state(item.state_key)))
+
+
+ def get(self, k):
+ return self.items[k]
+
+
+ def next_item(self):
+ try:
+ k = self.item_keys.pop(0)
+ except IndexError:
+ return None
+ return self.items[k]
+
+
+ def connect(self):
+ self.filter_state.connect()
+
+
+ def disconnect(self):
+ self.filter_state.disconnect()
diff --git a/chainsyncer/store/fs.py b/chainsyncer/store/fs.py
@@ -6,159 +6,21 @@ import logging
# external imports
from shep.store.file import SimpleFileStoreFactory
from shep.persist import PersistedState
-from shep.error import StateInvalid
# local imports
from chainsyncer.state import SyncState
-from chainsyncer.error import (
- LockError,
- FilterDone,
- InterruptError,
- IncompleteFilterError,
- SyncDone,
+from chainsyncer.store import (
+ SyncItem,
+ SyncStore,
)
-logg = logging.getLogger(__name__)
-
-
-def sync_state_serialize(block_height, tx_index, block_target):
- b = block_height.to_bytes(4, 'big')
- b += tx_index.to_bytes(4, 'big')
- b += block_target.to_bytes(4, 'big', signed=True)
- return b
-
-
-def sync_state_deserialize(b):
- block_height = int.from_bytes(b[:4], 'big')
- tx_index = int.from_bytes(b[4:8], 'big')
- block_target = int.from_bytes(b[8:], 'big', signed=True)
- return (block_height, tx_index, block_target,)
-
-
-# NOT thread safe
-class SyncFsItem:
-
- def __init__(self, offset, target, sync_state, filter_state, started=False, ignore_invalid=False):
- self.offset = offset
- self.target = target
- self.sync_state = sync_state
- self.filter_state = filter_state
- self.state_key = str(offset)
-
- logg.debug('get key {}'.format(self.state_key))
- v = self.sync_state.get(self.state_key)
-
- (self.cursor, self.tx_cursor, self.target) = sync_state_deserialize(v)
-
- if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') and not ignore_invalid:
- raise LockError(self.state_key)
-
- self.count = len(self.filter_state.all(pure=True)) - 4
- self.skip_filter = False
- if self.count == 0:
- self.skip_filter = True
- elif not started:
- self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
-
-
- def __check_done(self):
- if self.filter_state.state(self.state_key) & self.filter_state.from_name('INTERRUPT') > 0:
- raise InterruptError(self.state_key)
- if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') > 0:
- raise FilterDone(self.state_key)
-
-
- def reset(self):
- if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
- raise LockError('reset attempt on {} when state locked'.format(self.state_key))
- if self.filter_state.state(self.state_key) & self.filter_state.from_name('DONE') == 0:
- raise IncompleteFilterError('reset attempt on {} when incomplete'.format(self.state_key))
- self.filter_state.move(self.state_key, self.filter_state.from_name('RESET'))
-
-
- def next(self, advance_block=False):
- v = self.sync_state.state(self.state_key)
- if v == self.sync_state.DONE:
- raise SyncDone(self.target)
- elif v == self.sync_state.NEW:
- self.sync_state.next(self.state_key)
-
- v = self.sync_state.get(self.state_key)
- (block_number, tx_index, target) = sync_state_deserialize(v)
- if advance_block:
- block_number += 1
- tx_index = 0
- if self.target >= 0 and block_number > self.target:
- self.sync_state.move(self.state_key, self.sync_state.DONE)
- raise SyncDone(self.target)
- else:
- tx_index += 1
-
- self.cursor = block_number
- self.tx_cursor = tx_index
-
- b = sync_state_serialize(block_number, tx_index, target)
- self.sync_state.replace(self.state_key, b)
-
-
- def __find_advance(self):
- v = self.filter_state.state(self.state_key)
-
-
- def advance(self):
- if self.skip_filter:
- raise FilterDone()
- self.__check_done()
-
- if self.filter_state.state(self.state_key) & self.filter_state.from_name('LOCK') > 0:
- raise LockError('advance attempt on {} when state locked'.format(self.state_key))
- done = False
- try:
- self.filter_state.next(self.state_key)
- except StateInvalid:
- done = True
- if done:
- raise FilterDone()
- self.filter_state.set(self.state_key, self.filter_state.from_name('LOCK'))
-
-
- def release(self, interrupt=False):
- if self.skip_filter:
- return False
- if interrupt:
- self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
- self.filter_state.set(self.state_key, self.filter_state.from_name('INTERRUPT'))
- self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
- return False
-
- state = self.filter_state.state(self.state_key)
- if state & self.filter_state.from_name('LOCK') == 0:
- raise LockError('release attempt on {} when state unlocked'.format(self.state_key))
- self.filter_state.unset(self.state_key, self.filter_state.from_name('LOCK'))
- try:
- c = self.filter_state.peek(self.state_key)
- logg.debug('peeked {}'.format(c))
- except StateInvalid:
- self.filter_state.set(self.state_key, self.filter_state.from_name('DONE'))
- return False
- return True
-
-
- def __str__(self):
- return 'syncitem offset {} target {} cursor {}'.format(self.offset, self.target, self.cursor)
+logg = logging.getLogger(__name__)
-class SyncFsStore:
+class SyncFsStore(SyncStore):
def __init__(self, base_path, session_id=None, state_event_callback=None, filter_state_event_callback=None):
- self.session_id = None
- self.session_path = None
- self.is_default = False
- self.first = False
- self.target = None
- self.items = {}
- self.item_keys = []
- self.started = False
+ super(SyncFsStore, self).__init__(session_id=session_id)
default_path = os.path.join(base_path, 'default')
@@ -195,11 +57,6 @@ class SyncFsStore:
self.filters = [] # used by SyncSession
- def register(self, fltr):
- self.filters.append(fltr)
- self.filter_state.register(fltr)
-
-
def __create_path(self, base_path, default_path, session_id=None):
logg.debug('fs store path {} does not exist, creating'.format(self.session_path))
if session_id == None:
@@ -214,7 +71,7 @@ class SyncFsStore:
pass
- def __load(self, target):
+ def load(self, target):
self.state.sync(self.state.NEW)
self.state.sync(self.state.SYNC)
@@ -233,11 +90,13 @@ class SyncFsStore:
thresholds_new.sort()
thresholds = thresholds_sync + thresholds_new
lim = len(thresholds) - 1
+
+ logg.debug('thresholds {}'.format(thresholds))
for i in range(len(thresholds)):
item_target = target
if i < lim:
item_target = thresholds[i+1]
- o = SyncFsItem(block_number, item_target, self.state, self.filter_state, started=True)
+ o = SyncItem(block_number, item_target, self.state, self.filter_state, started=True)
self.items[block_number] = o
self.item_keys.append(block_number)
logg.info('added existing {}'.format(o))
@@ -263,62 +122,3 @@ class SyncFsStore:
f.write(str(target))
f.close()
self.target = target
-
-
- def start(self, offset=0, target=-1):
- if self.started:
- return
-
- self.__load(target)
-
- if self.first:
- state_bytes = sync_state_serialize(offset, 0, target)
- block_number_str = str(offset)
- self.state.put(block_number_str, state_bytes)
- self.filter_state.put(block_number_str)
- o = SyncFsItem(offset, target, self.state, self.filter_state)
- self.items[offset] = o
- self.item_keys.append(offset)
- elif offset > 0:
- logg.warning('block number argument {} for start ignored for already initiated sync {}'.format(offset, self.session_id))
- self.started = True
-
- self.item_keys.sort()
-
-
- def stop(self, item):
- if item.target == -1:
- state_bytes = sync_state_serialize(item.cursor, 0, item.cursor)
- self.state.replace(str(item.offset), state_bytes)
- self.filter_state.put(str(item.cursor))
-
- SyncFsItem(item.offset, -1, self.state, self.filter_state)
- logg.info('New sync state start at block number {} for next head sync backfill'.format(item.cursor))
-
- self.state.move(item.state_key, self.state.DONE)
-
- state_bytes = sync_state_serialize(item.cursor, 0, -1)
- self.state.put(str(item.cursor), state_bytes)
-
- logg.debug('item {}'.format(self.state.state(item.state_key)))
-
-
- def get(self, k):
- return self.items[k]
-
-
- def next_item(self):
- try:
- k = self.item_keys.pop(0)
- except IndexError:
- return None
- return self.items[k]
-
-
- def connect(self):
- self.filter_state.connect()
-
-
- def disconnect(self):
- self.filter_state.disconnect()
-
diff --git a/requirements.txt b/requirements.txt
@@ -2,4 +2,4 @@ confini~=0.6.0
semver==2.13.0
hexathon~=0.1.5
chainlib>=0.1.0b1,<=0.1.0
-shep~=0.1.1
+shep>=0.2.0rc1,<0.3.0
diff --git a/tests/test_session.py b/tests/test_session.py
@@ -40,8 +40,8 @@ class TestFilter(unittest.TestCase):
self.conn = MockConn()
- def tearDown(self):
- shutil.rmtree(self.path)
+# def tearDown(self):
+# shutil.rmtree(self.path)
def test_filter_basic(self):