commit a42622d8330bbae19deba8721a062dfaf63cf574
parent 82e26745556103a6cf5014d02d955d1396bf3d69
Author: Louis Holbrook <accounts-gitlab@holbrook.no>
Date: Sun, 4 Apr 2021 13:03:59 +0000
Merge branch 'lash/add-filter' into 'master'
Lash/add filter
See merge request nolash/chainsyncer!1
Diffstat:
17 files changed, 641 insertions(+), 130 deletions(-)
diff --git a/MANIFEST.in b/MANIFEST.in
@@ -1 +1 @@
-include requirements.txt LICENSE.txt
+include requirements.txt LICENSE.txt sql/**/*
diff --git a/chainsyncer/backend.py b/chainsyncer/backend.py
@@ -7,9 +7,10 @@ from chainlib.chain import ChainSpec
# local imports
from chainsyncer.db.models.sync import BlockchainSync
+from chainsyncer.db.models.filter import BlockchainSyncFilter
from chainsyncer.db.models.base import SessionBase
-logg = logging.getLogger()
+logg = logging.getLogger(__name__)
class SyncerBackend:
@@ -23,6 +24,7 @@ class SyncerBackend:
def __init__(self, chain_spec, object_id):
self.db_session = None
self.db_object = None
+ self.db_object_filter = None
self.chain_spec = chain_spec
self.object_id = object_id
self.connect()
@@ -34,16 +36,28 @@ class SyncerBackend:
"""
if self.db_session == None:
self.db_session = SessionBase.create_session()
+
q = self.db_session.query(BlockchainSync)
q = q.filter(BlockchainSync.id==self.object_id)
self.db_object = q.first()
+
+ if self.db_object != None:
+ qtwo = self.db_session.query(BlockchainSyncFilter)
+ qtwo = qtwo.join(BlockchainSync)
+ qtwo = qtwo.filter(BlockchainSync.id==self.db_object.id)
+ self.db_object_filter = qtwo.first()
+
if self.db_object == None:
raise ValueError('sync entry with id {} not found'.format(self.object_id))
+ return self.db_session
+
def disconnect(self):
"""Commits state of sync to backend.
"""
+ if self.db_object_filter != None:
+ self.db_session.add(self.db_object_filter)
self.db_session.add(self.db_object)
self.db_session.commit()
self.db_session.close()
@@ -67,8 +81,9 @@ class SyncerBackend:
"""
self.connect()
pair = self.db_object.cursor()
+ (filter_state, count, digest) = self.db_object_filter.cursor()
self.disconnect()
- return pair
+ return (pair, filter_state,)
def set(self, block_height, tx_height):
@@ -82,8 +97,10 @@ class SyncerBackend:
"""
self.connect()
pair = self.db_object.set(block_height, tx_height)
+ self.db_object_filter.clear()
+ (filter_state, count, digest)= self.db_object_filter.cursor()
self.disconnect()
- return pair
+ return (pair, filter_state,)
def start(self):
@@ -94,8 +111,9 @@ class SyncerBackend:
"""
self.connect()
pair = self.db_object.start()
+ (filter_state, count, digest) = self.db_object_filter.start()
self.disconnect()
- return pair
+ return (pair, filter_state,)
def target(self):
@@ -106,12 +124,13 @@ class SyncerBackend:
"""
self.connect()
target = self.db_object.target()
+ (filter_target, count, digest) = self.db_object_filter.target()
self.disconnect()
- return target
+ return (target, filter_target,)
@staticmethod
- def first(chain):
+ def first(chain_spec):
"""Returns the model object of the most recent syncer in backend.
:param chain: Chain spec of chain that syncer is running for.
@@ -119,11 +138,16 @@ class SyncerBackend:
:returns: Last syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
- return BlockchainSync.first(chain)
+ #return BlockchainSync.first(str(chain_spec))
+ object_id = BlockchainSync.first(str(chain_spec))
+ if object_id == None:
+ return None
+ return SyncerBackend(chain_spec, object_id)
+
@staticmethod
- def initial(chain, block_height):
+ def initial(chain_spec, target_block_height, start_block_height=0):
"""Creates a new syncer session and commit its initial state to backend.
:param chain: Chain spec of chain that syncer is running for.
@@ -133,24 +157,31 @@ class SyncerBackend:
:returns: New syncer object
:rtype: cic_eth.db.models.BlockchainSync
"""
+ if start_block_height >= target_block_height:
+ raise ValueError('start block height must be lower than target block height')
object_id = None
session = SessionBase.create_session()
- o = BlockchainSync(chain, 0, 0, block_height)
+ o = BlockchainSync(str(chain_spec), start_block_height, 0, target_block_height)
session.add(o)
session.commit()
object_id = o.id
+
+ of = BlockchainSyncFilter(o)
+ session.add(of)
+ session.commit()
+
session.close()
- return SyncerBackend(chain, object_id)
+ return SyncerBackend(chain_spec, object_id)
@staticmethod
- def resume(chain, block_height):
+ def resume(chain_spec, block_height):
"""Retrieves and returns all previously unfinished syncer sessions.
- :param chain: Chain spec of chain that syncer is running for.
- :type chain: cic_registry.chain.ChainSpec
+ :param chain_spec: Chain spec of chain that syncer is running for.
+ :type chain_spec: cic_registry.chain.ChainSpec
:param block_height: Target block height
:type block_height: number
:returns: Syncer objects of unfinished syncs
@@ -162,18 +193,56 @@ class SyncerBackend:
object_id = None
- for object_id in BlockchainSync.get_unsynced(session=session):
- logg.debug('block syncer resume added previously unsynced sync entry id {}'.format(object_id))
- syncers.append(SyncerBackend(chain, object_id))
+ highest_unsynced_block = 0
+ highest_unsynced_tx = 0
+ object_id = BlockchainSync.get_last(session=session, live=False)
+ if object_id != None:
+ q = session.query(BlockchainSync)
+ o = q.get(object_id)
+ (highest_unsynced_block, highest_unsynced_index) = o.cursor()
+
+ object_ids = BlockchainSync.get_unsynced(session=session)
+ session.close()
+
+ for object_id in object_ids:
+ s = SyncerBackend(chain_spec, object_id)
+ logg.debug('resume unfinished {}'.format(s))
+ syncers.append(s)
+
+ session = SessionBase.create_session()
+
+ last_live_id = BlockchainSync.get_last(session=session)
+ if last_live_id != None:
+
+ q = session.query(BlockchainSync)
+ o = q.get(last_live_id)
+
+ (block_resume, tx_resume) = o.cursor()
+ session.flush()
+
+ #if block_height != block_resume:
+ if highest_unsynced_block < block_resume:
+
+ q = session.query(BlockchainSyncFilter)
+ q = q.filter(BlockchainSyncFilter.chain_sync_id==last_live_id)
+ of = q.first()
+ (flags, count, digest) = of.cursor()
+
+ session.flush()
+
+ o = BlockchainSync(str(chain_spec), block_resume, tx_resume, block_height)
+ session.add(o)
+ session.flush()
+ object_id = o.id
+
+ of = BlockchainSyncFilter(o, count, flags, digest)
+ session.add(of)
+ session.commit()
- (block_resume, tx_resume) = BlockchainSync.get_last_live_height(block_height, session=session)
- if block_height != block_resume:
- o = BlockchainSync(chain, block_resume, tx_resume, block_height)
- session.add(o)
- session.commit()
- object_id = o.id
- syncers.append(SyncerBackend(chain, object_id))
- logg.debug('block syncer resume added new sync entry from previous run id {}, start{}:{} target {}'.format(object_id, block_resume, tx_resume, block_height))
+ backend = SyncerBackend(chain_spec, object_id)
+ syncers.append(backend)
+
+ logg.debug('last live session resume {}'.format(backend))
session.close()
@@ -193,15 +262,40 @@ class SyncerBackend:
"""
object_id = None
session = SessionBase.create_session()
+
o = BlockchainSync(str(chain_spec), block_height, 0, None)
session.add(o)
- session.commit()
+ session.flush()
object_id = o.id
+
+ of = BlockchainSyncFilter(o)
+ session.add(of)
+ session.commit()
+
session.close()
return SyncerBackend(chain_spec, object_id)
+ def register_filter(self, name):
+ self.connect()
+ if self.db_object_filter == None:
+ self.db_object_filter = BlockchainSyncFilter(self.db_object)
+ self.db_object_filter.add(name)
+ self.db_session.add(self.db_object_filter)
+ self.disconnect()
+
+
+ def complete_filter(self, n):
+ self.db_object_filter.set(n)
+
+
+
+ def __str__(self):
+ return "syncerbackend chain {} start {} target {}".format(self.chain(), self.start(), self.target())
+
+
+
class MemBackend:
def __init__(self, chain_spec, object_id):
@@ -209,6 +303,7 @@ class MemBackend:
self.chain_spec = chain_spec
self.block_height = 0
self.tx_height = 0
+ self.flags = 0
self.db_session = None
@@ -227,4 +322,17 @@ class MemBackend:
def get(self):
- return (self.block_height, self.tx_height)
+ return ((self.block_height, self.tx_height), self.flags)
+
+
+ def register_filter(self, name):
+ pass
+
+
+ def complete_filter(self, n):
+ pass
+
+
+ def __str__(self):
+ return "syncer membackend chain {} cursor".format(self.get())
+
diff --git a/chainsyncer/db/migrations/sqlalchemy.py b/chainsyncer/db/migrations/sqlalchemy.py
@@ -0,0 +1,36 @@
+from alembic import op
+import sqlalchemy as sa
+
+def chainsyncer_upgrade(major, minor, patch):
+ r0_0_1_u()
+
+def chainsyncer_downgrade(major, minor, patch):
+ r0_0_1_d()
+
+def r0_0_1_u():
+ op.create_table(
+ 'chain_sync',
+ sa.Column('id', sa.Integer, primary_key=True),
+ sa.Column('blockchain', sa.String, nullable=False),
+ sa.Column('block_start', sa.Integer, nullable=False, default=0),
+ sa.Column('tx_start', sa.Integer, nullable=False, default=0),
+ sa.Column('block_cursor', sa.Integer, nullable=False, default=0),
+ sa.Column('tx_cursor', sa.Integer, nullable=False, default=0),
+ sa.Column('block_target', sa.Integer, nullable=True),
+ sa.Column('date_created', sa.DateTime, nullable=False),
+ sa.Column('date_updated', sa.DateTime),
+ )
+
+ op.create_table(
+ 'chain_sync_filter',
+ sa.Column('id', sa.Integer, primary_key=True),
+ sa.Column('chain_sync_id', sa.Integer, sa.ForeignKey('chain_sync.id'), nullable=True),
+ sa.Column('flags', sa.LargeBinary, nullable=True),
+ sa.Column('flags_start', sa.LargeBinary, nullable=True),
+ sa.Column('count', sa.Integer, nullable=False, default=0),
+ sa.Column('digest', sa.String(64), nullable=False),
+ )
+
+def r0_0_1_d():
+ op.drop_table('chain_sync_filter')
+ op.drop_table('chain_sync')
diff --git a/chainsyncer/db/models/base.py b/chainsyncer/db/models/base.py
@@ -1,8 +1,18 @@
+# stanard imports
+import logging
+
# third-party imports
from sqlalchemy import Column, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
+from sqlalchemy.pool import (
+ StaticPool,
+ QueuePool,
+ AssertionPool,
+ )
+
+logg = logging.getLogger()
Model = declarative_base(name='Model')
@@ -21,7 +31,11 @@ class SessionBase(Model):
transactional = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
poolable = True
- """Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
+ """Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
+ procedural = True
+ """Whether the database backend supports stored procedures"""
+ localsessions = {}
+ """Contains dictionary of sessions initiated by db model components"""
@staticmethod
@@ -40,7 +54,7 @@ class SessionBase(Model):
@staticmethod
- def connect(dsn, debug=False):
+ def connect(dsn, pool_size=8, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.
@@ -48,14 +62,28 @@ class SessionBase(Model):
"""
e = None
if SessionBase.poolable:
- e = create_engine(
- dsn,
- max_overflow=50,
- pool_pre_ping=True,
- pool_size=20,
- pool_recycle=10,
- echo=debug,
- )
+ poolclass = QueuePool
+ if pool_size > 1:
+ e = create_engine(
+ dsn,
+ max_overflow=pool_size*3,
+ pool_pre_ping=True,
+ pool_size=pool_size,
+ pool_recycle=60,
+ poolclass=poolclass,
+ echo=debug,
+ )
+ else:
+ if debug:
+ poolclass = AssertionPool
+ else:
+ poolclass = StaticPool
+
+ e = create_engine(
+ dsn,
+ poolclass=poolclass,
+ echo=debug,
+ )
else:
e = create_engine(
dsn,
@@ -71,3 +99,24 @@ class SessionBase(Model):
"""
SessionBase.engine.dispose()
SessionBase.engine = None
+
+
+ @staticmethod
+ def bind_session(session=None):
+ localsession = session
+ if localsession == None:
+ localsession = SessionBase.create_session()
+ localsession_key = str(id(localsession))
+ logg.debug('creating new session {}'.format(localsession_key))
+ SessionBase.localsessions[localsession_key] = localsession
+ return localsession
+
+
+ @staticmethod
+ def release_session(session=None):
+ session.flush()
+ session_key = str(id(session))
+ if SessionBase.localsessions.get(session_key) != None:
+ logg.debug('destroying session {}'.format(session_key))
+ session.commit()
+ session.close()
diff --git a/chainsyncer/db/models/filter.py b/chainsyncer/db/models/filter.py
@@ -1,40 +1,88 @@
# standard imports
+import logging
import hashlib
-# third-party imports
-from sqlalchemy import Column, String, Integer, BLOB
+# external imports
+from sqlalchemy import Column, String, Integer, LargeBinary, ForeignKey
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
# local imports
from .base import SessionBase
+from .sync import BlockchainSync
-
-zero_digest = '{:<064s'.format('0')
+zero_digest = bytes(32).hex()
+logg = logging.getLogger(__name__)
class BlockchainSyncFilter(SessionBase):
__tablename__ = 'chain_sync_filter'
- chain_sync_id = Column(Integer, ForeignKey='chain_sync.id')
- flags = Column(BLOB)
- digest = Column(String)
+ chain_sync_id = Column(Integer, ForeignKey('chain_sync.id'))
+ flags_start = Column(LargeBinary)
+ flags = Column(LargeBinary)
+ digest = Column(String(64))
count = Column(Integer)
- @staticmethod
- def set(self, names):
-
- def __init__(self, names, chain_sync, digest=None):
- if len(names) == 0:
- digest = zero_digest
- elif digest == None:
- h = hashlib.new('sha256')
- for n in names:
- h.update(n.encode('utf-8') + b'\x00')
- z = h.digest()
- digest = z.hex()
+ def __init__(self, chain_sync, count=0, flags=None, digest=zero_digest):
self.digest = digest
- self.count = len(names)
- self.flags = bytearray((len(names) -1 ) / 8 + 1)
+ self.count = count
+
+ if flags == None:
+ flags = bytearray(0)
+ else: # TODO: handle bytes too
+ bytecount = int((count - 1) / 8 + 1)
+ flags = flags.to_bytes(bytecount, 'big')
+ self.flags_start = flags
+ self.flags = flags
+
self.chain_sync_id = chain_sync.id
+
+
+ def add(self, name):
+ h = hashlib.new('sha256')
+ h.update(bytes.fromhex(self.digest))
+ h.update(name.encode('utf-8'))
+ z = h.digest()
+
+ old_byte_count = int((self.count - 1) / 8 + 1)
+ new_byte_count = int((self.count) / 8 + 1)
+
+ if old_byte_count != new_byte_count:
+ self.flags = bytearray(1) + self.flags
+ self.count += 1
+ self.digest = z.hex()
+
+
+ def start(self):
+ return (int.from_bytes(self.flags_start, 'big'), self.count, self.digest)
+
+
+ def cursor(self):
+ return (int.from_bytes(self.flags, 'big'), self.count, self.digest)
+
+
+ def target(self):
+ n = 0
+ for i in range(self.count):
+ n |= (1 << self.count) - 1
+ return (n, self.count, self.digest)
+
+
+ def clear(self):
+ self.flags = bytearray(len(self.flags))
+
+
+ def set(self, n):
+ if n > self.count:
+ raise IndexError('bit flag out of range')
+
+ b = 1 << (n % 8)
+ i = int(n / 8)
+ byte_idx = len(self.flags)-1-i
+ if (self.flags[byte_idx] & b) > 0:
+ raise AttributeError('Filter bit already set')
+ flags = bytearray(self.flags)
+ flags[byte_idx] |= b
+ self.flags = flags
diff --git a/chainsyncer/db/models/sync.py b/chainsyncer/db/models/sync.py
@@ -41,47 +41,51 @@ class BlockchainSync(SessionBase):
:type chain: str
:param session: Session to use. If not specified, a separate session will be created for this method only.
:type session: SqlAlchemy Session
- :returns: True if sync record found
- :rtype: bool
+ :returns: Database primary key id of sync record
+ :rtype: number|None
"""
- local_session = False
- if session == None:
- session = SessionBase.create_session()
- local_session = True
+ session = SessionBase.bind_session(session)
+
q = session.query(BlockchainSync.id)
q = q.filter(BlockchainSync.blockchain==chain)
o = q.first()
- if local_session:
- session.close()
- return o == None
+
+ if o == None:
+ SessionBase.release_session(session)
+ return None
+
+ sync_id = o.id
+
+ SessionBase.release_session(session)
+
+ return sync_id
@staticmethod
- def get_last_live_height(current, session=None):
+ def get_last(session=None, live=True):
"""Get the most recent open-ended ("live") syncer record.
- :param current: Current block number
- :type current: number
:param session: Session to use. If not specified, a separate session will be created for this method only.
:type session: SqlAlchemy Session
:returns: Block and transaction number, respectively
:rtype: tuple
"""
- local_session = False
- if session == None:
- session = SessionBase.create_session()
- local_session = True
- q = session.query(BlockchainSync)
- q = q.filter(BlockchainSync.block_target==None)
+ session = SessionBase.bind_session(session)
+
+ q = session.query(BlockchainSync.id)
+ if live:
+ q = q.filter(BlockchainSync.block_target==None)
+ else:
+ q = q.filter(BlockchainSync.block_target!=None)
q = q.order_by(BlockchainSync.date_created.desc())
- o = q.first()
- if local_session:
- session.close()
+ object_id = q.first()
- if o == None:
- return (0, 0)
+ SessionBase.release_session(session)
+
+ if object_id == None:
+ return None
- return (o.block_cursor, o.tx_cursor)
+ return object_id[0]
@staticmethod
@@ -122,6 +126,8 @@ class BlockchainSync(SessionBase):
"""
self.block_cursor = block_height
self.tx_cursor = tx_height
+ self.date_updated = datetime.datetime.utcnow()
+ return (self.block_cursor, self.tx_cursor,)
def cursor(self):
@@ -165,4 +171,21 @@ class BlockchainSync(SessionBase):
self.tx_cursor = tx_start
self.block_target = block_target
self.date_created = datetime.datetime.utcnow()
- self.date_modified = datetime.datetime.utcnow()
+ self.date_updated = datetime.datetime.utcnow()
+
+
+ def __str__(self):
+ return """object_id: {}
+start: {}:{}
+cursor: {}:{}
+target: {}
+""".format(
+ self.id,
+ self.block_start,
+ self.tx_start,
+ self.block_cursor,
+ self.tx_cursor,
+ self.block_target,
+ )
+
+
diff --git a/chainsyncer/driver.py b/chainsyncer/driver.py
@@ -2,27 +2,35 @@
import uuid
import logging
import time
+import signal
# external imports
+import sqlalchemy
from chainlib.eth.block import (
block_by_number,
Block,
)
+from chainlib.eth.tx import receipt
# local imports
from chainsyncer.filter import SyncFilter
+from chainsyncer.error import (
+ SyncDone,
+ NoBlockForYou,
+ )
-logg = logging.getLogger()
+logg = logging.getLogger(__name__)
def noop_callback(block_number, tx_index, s=None):
- logg.debug('({},{}) {}'.format(block_number, tx_index, s))
+ logg.debug('noop callback ({},{}) {}'.format(block_number, tx_index, s))
class Syncer:
running_global = True
yield_delay=0.005
+ signal_set = False
def __init__(self, backend, loop_callback=noop_callback, progress_callback=noop_callback):
self.cursor = None
@@ -31,6 +39,22 @@ class Syncer:
self.filter = SyncFilter(backend)
self.progress_callback = progress_callback
self.loop_callback = loop_callback
+ if not Syncer.signal_set:
+ signal.signal(signal.SIGINT, Syncer.__sig_terminate)
+ signal.signal(signal.SIGTERM, Syncer.__sig_terminate)
+ Syncer.signal_set = True
+
+
+ @staticmethod
+ def __sig_terminate(sig, frame):
+ logg.warning('got signal {}'.format(sig))
+ Syncer.terminate()
+
+
+ @staticmethod
+ def terminate():
+ logg.info('termination requested!')
+ Syncer.running_global = False
def chain(self):
@@ -44,6 +68,7 @@ class Syncer:
def add_filter(self, f):
self.filter.add(f)
+ self.backend.register_filter(str(f))
class BlockPollSyncer(Syncer):
@@ -53,18 +78,25 @@ class BlockPollSyncer(Syncer):
def loop(self, interval, conn):
- g = self.backend.get()
+ (g, flags) = self.backend.get()
last_tx = g[1]
last_block = g[0]
self.progress_callback(last_block, last_tx, 'loop started')
while self.running and Syncer.running_global:
if self.loop_callback != None:
self.loop_callback(last_block, last_tx)
- while True:
+ while True and Syncer.running_global:
try:
block = self.get(conn)
- except Exception:
+ except SyncDone as e:
+ logg.info('sync done: {}'.format(e))
+ return self.backend.get()
+ except NoBlockForYou as e:
break
+# TODO: To properly handle this, ensure that previous request is rolled back
+# except sqlalchemy.exc.OperationalError as e:
+# logg.error('database error: {}'.format(e))
+# break
last_block = block.number
self.process(conn, block)
start_tx = 0
@@ -76,10 +108,6 @@ class BlockPollSyncer(Syncer):
class HeadSyncer(BlockPollSyncer):
- def __init__(self, backend, loop_callback=noop_callback, progress_callback=noop_callback):
- super(HeadSyncer, self).__init__(backend, loop_callback, progress_callback)
-
-
def process(self, conn, block):
logg.debug('process block {}'.format(block))
i = 0
@@ -87,21 +115,62 @@ class HeadSyncer(BlockPollSyncer):
while True:
try:
tx = block.tx(i)
+ rcpt = conn.do(receipt(tx.hash))
+ tx.apply_receipt(rcpt)
self.progress_callback(block.number, i, 'processing {}'.format(repr(tx)))
self.backend.set(block.number, i)
self.filter.apply(conn, block, tx)
except IndexError as e:
+ logg.debug('index error syncer rcpt get {}'.format(e))
self.backend.set(block.number + 1, 0)
break
i += 1
def get(self, conn):
- (block_number, tx_number) = self.backend.get()
+ (height, flags) = self.backend.get()
+ block_number = height[0]
+ block_hash = []
+ o = block_by_number(block_number)
+ r = conn.do(o)
+ if r == None:
+ raise NoBlockForYou()
+ b = Block(r)
+
+ return b
+
+
+ def __str__(self):
+ return '[headsyncer] {}'.format(str(self.backend))
+
+
+class HistorySyncer(HeadSyncer):
+
+ def __init__(self, backend, loop_callback=noop_callback, progress_callback=noop_callback):
+ super(HeadSyncer, self).__init__(backend, loop_callback, progress_callback)
+ self.block_target = None
+ (block_number, flags) = self.backend.target()
+ if block_number == None:
+ raise AttributeError('backend has no future target. Use HeadSyner instead')
+ self.block_target = block_number
+
+
+ def get(self, conn):
+ (height, flags) = self.backend.get()
+ if self.block_target < height[0]:
+ raise SyncDone(self.block_target)
+ block_number = height[0]
block_hash = []
o = block_by_number(block_number)
r = conn.do(o)
+ if r == None:
+ raise NoBlockForYou()
b = Block(r)
- logg.debug('get {}'.format(b))
return b
+
+
+ def __str__(self):
+ return '[historysyncer] {}'.format(str(self.backend))
+
+
diff --git a/chainsyncer/error.py b/chainsyncer/error.py
@@ -1,8 +1,11 @@
-class LoopDone(Exception):
+class SyncDone(Exception):
"""Exception raised when a syncing is complete.
"""
pass
+class NoBlockForYou(Exception):
+ pass
+
class RequestError(Exception):
pass
diff --git a/chainsyncer/filter.py b/chainsyncer/filter.py
@@ -9,6 +9,7 @@ from .error import BackendError
logg = logging.getLogger(__name__)
+
class SyncFilter:
def __init__(self, backend, safe=True):
@@ -20,7 +21,7 @@ class SyncFilter:
def add(self, fltr):
if getattr(fltr, 'filter') == None:
raise ValueError('filter object must implement have method filter')
- logg.debug('added filter {}'.format(str(fltr)))
+ logg.debug('added filter "{}"'.format(str(fltr)))
self.filters.append(fltr)
@@ -32,10 +33,15 @@ class SyncFilter:
except sqlalchemy.exc.TimeoutError as e:
self.backend.disconnect()
raise BackendError('database connection fail: {}'.format(e))
+ i = 0
for f in self.filters:
+ i += 1
logg.debug('applying filter {}'.format(str(f)))
- f.filter(conn, block, tx, self.backend.db_session)
- self.backend.disconnect()
+ f.filter(conn, block, tx, session)
+ self.backend.complete_filter(i)
+ if session != None:
+ self.backend.disconnect()
+
class NoopFilter:
diff --git a/requirements.txt b/requirements.txt
@@ -1,6 +1,6 @@
psycopg2==2.8.6
SQLAlchemy==1.3.20
-confini~=0.3.6b2
+confini~=0.3.6rc3
semver==2.13.0
-hexathon~=0.0.1a3
-chainlib~=0.0.1a15
+hexathon~=0.0.1a7
+chainlib~=0.0.2a1
diff --git a/setup.cfg b/setup.cfg
@@ -1,6 +1,6 @@
[metadata]
name = chainsyncer
-version = 0.0.1a10
+version = 0.0.1a21
description = Generic blockchain syncer driver
author = Louis Holbrook
author_email = dev@holbrook.no
@@ -21,13 +21,19 @@ licence_files =
LICENSE.txt
[options]
+include_package_data = True
python_requires = >= 3.6
packages =
chainsyncer
chainsyncer.db
+ chainsyncer.db.migrations
chainsyncer.db.models
chainsyncer.runnable
+[options.package_data]
+* =
+ sql/*
+
#[options.entry_points]
#console_scripts =
# blocksync-celery = chainsyncer.runnable.tracker:main
diff --git a/sql/postgresql/1.sql b/sql/postgresql/1.sql
@@ -1,4 +1,4 @@
-DROP TABLE chain_sync;
+DROP TABLE IF EXISTS chain_sync CASCADE;
CREATE TABLE IF NOT EXISTS chain_sync (
id serial primary key not null,
blockchain varchar not null,
@@ -6,21 +6,7 @@ CREATE TABLE IF NOT EXISTS chain_sync (
tx_start int not null default 0,
block_cursor int not null default 0,
tx_cursor int not null default 0,
- flags bytea not null,
- num_flags int not null,
block_target int default null,
date_created timestamp not null,
date_updated timestamp default null
);
-
-DROP TABLE chain_sync_filter;
-CREATE TABLE IF NOT EXISTS chain_sync_filter (
- id serial primary key not null,
- chain_sync_id int not null,
- flags bytea default null,
- count int not null default 0,
- digest char(64) not null default '0000000000000000000000000000000000000000000000000000000000000000',
- CONSTRAINT fk_chain_sync
- FOREIGN KEY(chain_sync_id)
- REFERENCES chain_sync(id)
-);
diff --git a/sql/postgresql/2.sql b/sql/postgresql/2.sql
@@ -0,0 +1,12 @@
+DROP TABLE IF EXISTS chain_sync_filter;
+CREATE TABLE IF NOT EXISTS chain_sync_filter (
+ id serial primary key not null,
+ chain_sync_id integer not null,
+ flags bytea default null,
+ flags_start bytea default null,
+ count integer not null default 0,
+ digest char(64) not null,
+ CONSTRAINT fk_chain_sync
+ FOREIGN KEY(chain_sync_id)
+ REFERENCES chain_sync(id)
+);
diff --git a/sql/sqlite/1.sql b/sql/sqlite/1.sql
@@ -1,13 +1,11 @@
CREATE TABLE IF NOT EXISTS chain_sync (
- id serial primary key not null,
+ id integer primary key autoincrement,
blockchain varchar not null,
- block_start int not null default 0,
- tx_start int not null default 0,
- block_cursor int not null default 0,
- tx_cursor int not null default 0,
- flags bytea not null,
- num_flags int not null,
- block_target int default null,
+ block_start integer not null default 0,
+ tx_start integer not null default 0,
+ block_cursor integer not null default 0,
+ tx_cursor integer not null default 0,
+ block_target integer default null,
date_created timestamp not null,
date_updated timestamp default null
);
diff --git a/sql/sqlite/2.sql b/sql/sqlite/2.sql
@@ -1,9 +1,10 @@
CREATE TABLE IF NOT EXISTS chain_sync_filter (
- id serial primary key not null,
- chain_sync_id int not null,
+ id integer primary key autoincrement not null,
+ chain_sync_id integer not null,
flags bytea default null,
- count int not null default 0,
- digest char(64) not null default '0000000000000000000000000000000000000000000000000000000000000000',
+ flags_start bytea default null,
+ count integer not null default 0,
+ digest char(64) not null,
CONSTRAINT fk_chain_sync
FOREIGN KEY(chain_sync_id)
REFERENCES chain_sync(id)
diff --git a/tests/base.py b/tests/base.py
@@ -1,13 +1,21 @@
+# standard imports
+import logging
import unittest
import tempfile
import os
#import pysqlite
+# external imports
+from chainlib.chain import ChainSpec
+
+# local imports
from chainsyncer.db import dsn_from_config
from chainsyncer.db.models.base import SessionBase
script_dir = os.path.realpath(os.path.dirname(__file__))
+logging.basicConfig(level=logging.DEBUG)
+
class TestBase(unittest.TestCase):
@@ -23,7 +31,7 @@ class TestBase(unittest.TestCase):
SessionBase.poolable = False
SessionBase.transactional = False
SessionBase.procedural = False
- SessionBase.connect(dsn, debug=True)
+ SessionBase.connect(dsn, debug=False)
f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '1.sql'), 'r')
sql = f.read()
@@ -39,6 +47,8 @@ class TestBase(unittest.TestCase):
conn = SessionBase.engine.connect()
conn.execute(sql)
+ self.chain_spec = ChainSpec('evm', 'foo', 42, 'bar')
+
def tearDown(self):
SessionBase.disconnect()
os.unlink(self.db_path)
diff --git a/tests/test_database.py b/tests/test_database.py
@@ -0,0 +1,156 @@
+# standard imports
+import unittest
+import logging
+
+# external imports
+from chainlib.chain import ChainSpec
+
+# local imports
+from chainsyncer.db.models.base import SessionBase
+from chainsyncer.db.models.filter import BlockchainSyncFilter
+from chainsyncer.backend import SyncerBackend
+
+# testutil imports
+from tests.base import TestBase
+
+logg = logging.getLogger()
+
+
+class TestDatabase(TestBase):
+
+
+ def test_backend_live(self):
+ s = SyncerBackend.live(self.chain_spec, 42)
+ self.assertEqual(s.object_id, 1)
+ backend = SyncerBackend.first(self.chain_spec)
+ #SyncerBackend(self.chain_spec, sync_id)
+ self.assertEqual(backend.object_id, 1)
+
+ bogus_chain_spec = ChainSpec('bogus', 'foo', 13, 'baz')
+ sync_id = SyncerBackend.first(bogus_chain_spec)
+ self.assertIsNone(sync_id)
+
+
+ def test_backend_filter(self):
+ s = SyncerBackend.live(self.chain_spec, 42)
+
+ s.connect()
+ filter_id = s.db_object_filter.id
+ s.disconnect()
+
+ session = SessionBase.create_session()
+ o = session.query(BlockchainSyncFilter).get(filter_id)
+ self.assertEqual(len(o.flags), 0)
+ session.close()
+
+ for i in range(9):
+ s.register_filter(str(i))
+
+ s.connect()
+ filter_id = s.db_object_filter.id
+ s.disconnect()
+
+ session = SessionBase.create_session()
+ o = session.query(BlockchainSyncFilter).get(filter_id)
+ self.assertEqual(len(o.flags), 2)
+
+ (t, c, d) = o.target()
+ self.assertEqual(t, (1 << 9) - 1)
+
+ for i in range(9):
+ o.set(i)
+
+ (f, c, d) = o.cursor()
+ self.assertEqual(f, t)
+ self.assertEqual(c, 9)
+ self.assertEqual(d, o.digest)
+
+ session.close()
+
+ def test_backend_retrieve(self):
+ s = SyncerBackend.live(self.chain_spec, 42)
+ s.register_filter('foo')
+ s.register_filter('bar')
+ s.register_filter('baz')
+
+ s.set(42, 13)
+
+ s = SyncerBackend.first(self.chain_spec)
+ self.assertEqual(s.get(), ((42,13), 0))
+
+
+ def test_backend_initial(self):
+ with self.assertRaises(ValueError):
+ s = SyncerBackend.initial(self.chain_spec, 42, 42)
+
+ with self.assertRaises(ValueError):
+ s = SyncerBackend.initial(self.chain_spec, 42, 43)
+
+ s = SyncerBackend.initial(self.chain_spec, 42, 13)
+
+ s.set(43, 13)
+
+ s = SyncerBackend.first(self.chain_spec)
+ self.assertEqual(s.get(), ((43,13), 0))
+ self.assertEqual(s.start(), ((13,0), 0))
+
+
+ def test_backend_resume(self):
+ s = SyncerBackend.resume(self.chain_spec, 666)
+ self.assertEqual(len(s), 0)
+
+ s = SyncerBackend.live(self.chain_spec, 42)
+ original_id = s.object_id
+ s = SyncerBackend.resume(self.chain_spec, 666)
+ self.assertEqual(len(s), 1)
+ resumed_id = s[0].object_id
+ self.assertEqual(resumed_id, original_id + 1)
+ self.assertEqual(s[0].get(), ((42, 0), 0))
+
+
+ def test_backend_resume_when_completed(self):
+ s = SyncerBackend.live(self.chain_spec, 42)
+
+ s = SyncerBackend.resume(self.chain_spec, 666)
+ s[0].set(666, 0)
+
+ s = SyncerBackend.resume(self.chain_spec, 666)
+ self.assertEqual(len(s), 0)
+
+
+ def test_backend_resume_several(self):
+ s = SyncerBackend.live(self.chain_spec, 42)
+ s.set(43, 13)
+
+ s = SyncerBackend.resume(self.chain_spec, 666)
+ SyncerBackend.live(self.chain_spec, 666)
+ s[0].set(123, 2)
+
+ s = SyncerBackend.resume(self.chain_spec, 1024)
+ SyncerBackend.live(self.chain_spec, 1024)
+
+ self.assertEqual(len(s), 2)
+ self.assertEqual(s[0].target(), (666, 0))
+ self.assertEqual(s[0].get(), ((123, 2), 0))
+ self.assertEqual(s[1].target(), (1024, 0))
+ self.assertEqual(s[1].get(), ((666, 0), 0))
+
+
+ def test_backend_resume_filter(self):
+ s = SyncerBackend.live(self.chain_spec, 42)
+ s.register_filter('foo')
+ s.register_filter('bar')
+ s.register_filter('baz')
+
+ s.set(43, 13)
+ s.complete_filter(0)
+ s.complete_filter(2)
+
+ s = SyncerBackend.resume(self.chain_spec, 666)
+ (pair, flags) = s[0].get()
+
+ self.assertEqual(flags, 5)
+
+
+if __name__ == '__main__':
+ unittest.main()