chainsyncer

Blockchain syncer driver
Log | Files | Refs | LICENSE

commit 877dde8cf68e7a4784585e8eb6afd312d58d8527
parent 578ee69cd3000c3817e4968f9e946605a3e1f3e9
Author: nolash <dev@holbrook.no>
Date:   Fri, 27 Aug 2021 14:04:34 +0200

WIP docstrings

Diffstat:
Mchainsyncer/db/models/base.py | 52+++++++++++++++++++++++++++++++++++++++++++++-------
Mchainsyncer/db/models/filter.py | 47++++++++++++++++++++++++++++++++++++++++++++++-
Mchainsyncer/db/models/sync.py | 75+++++++++++++++++++++++++++++++++++++++++++--------------------------------
3 files changed, 134 insertions(+), 40 deletions(-)

diff --git a/chainsyncer/db/models/base.py b/chainsyncer/db/models/base.py @@ -10,12 +10,16 @@ from sqlalchemy.pool import ( StaticPool, QueuePool, AssertionPool, + NullPool, ) logg = logging.getLogger() Model = declarative_base(name='Model') +CONNECTION_OVERFLOW_FACTOR = 3 +CONNECTION_RECYCLE_AFTER = 60 + class SessionBase(Model): """The base object for all SQLAlchemy enabled models. All other models must extend this. @@ -48,17 +52,32 @@ class SessionBase(Model): @staticmethod def _set_engine(engine): """Sets the database engine static property + + :param engine: The sqlalchemy engine + :type engine: sqlalchemy.engine.Engine """ SessionBase.engine = engine SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine) @staticmethod - def connect(dsn, pool_size=8, debug=False): + def connect(dsn, pool_size=16, debug=False): """Create new database connection engine and connect to database backend. - :param dsn: DSN string defining connection. + The pool_size argument controls the behavior of the connection pool. + + If the pool_size is greater than 1, and the engine has connection pool settings, The connection pool will be set up with the given number of connections. By default, it allows for 3x connection overflow (CONNECTION_OVERFLOW_FACTOR), and connection recycling after 60 seconds of inactivity (CONNECTION_RECYCLE_AFTER). + + If the pool_size is 1 and debug mode is off, the StaticPool class (single connection pool) will be used. If debug is on, AssertionPool will be used (which raises assertionerror if more than a single connection is attempted at any one time by the process). + + If the underlying engine does not have pooling capabilities, the pool_size parameter toggles the connection class used. If pool_size is set to 0, the NullPool will be used (build a new connection for every session). If pool_size is set to a positive number, the StaticPool will be used, keeping a single connection for all sessions. + + :param dsn: DSN string defining connection :type dsn: str + :param pool_size: Size of connection pool + :type pool_size: int + :param debug: Activate sql debug mode (outputs sql statements) + :type debug: bool """ e = None if SessionBase.poolable: @@ -66,10 +85,10 @@ class SessionBase(Model): if pool_size > 1: e = create_engine( dsn, - max_overflow=pool_size*3, + max_overflow=pool_size * CONNECTION_OVERFLOW_FACTOR, pool_pre_ping=True, pool_size=pool_size, - pool_recycle=60, + pool_recycle=CONNECTION_RECYCLE_AFTER, poolclass=poolclass, echo=debug, ) @@ -85,8 +104,12 @@ class SessionBase(Model): echo=debug, ) else: + pool_class = StaticPool + if pool_size < 1: + pool_class = NullPool e = create_engine( dsn, + poolclass=pool_class, echo=debug, ) @@ -103,6 +126,17 @@ class SessionBase(Model): @staticmethod def bind_session(session=None): + """Convenience function to enforce database session responsilibity in call stacks where it is unclear which layer will create a database session. + + If the session argument is None, the method will create and return a new database session. A reference to the database session will be statically stored in the SessionBase class, and must be explicitly released with release_session. + + When an existing session in passed as the argument, this method simply returns back the same session. + + :param session: An sqlalchemy session + :type session: session.orm.Session + :rtype: session.orm.Session + :returns: An sqlalchemy session + """ localsession = session if localsession == None: localsession = SessionBase.create_session() @@ -113,11 +147,15 @@ class SessionBase(Model): @staticmethod - def release_session(session=None): - session.flush() + def release_session(session): + """Checks if a reference to the given session exists in the SessionBase session store, and if it does commits the transaction and closes the session. + + :param session: An sqlalchemy session + :type session: session.orm.Session + """ session_key = str(id(session)) if SessionBase.localsessions.get(session_key) != None: - logg.debug('destroying session {}'.format(session_key)) + logg.debug('commit and destroy session {}'.format(session_key)) session.commit() session.close() del SessionBase.localsessions[session_key] diff --git a/chainsyncer/db/models/filter.py b/chainsyncer/db/models/filter.py @@ -15,6 +15,17 @@ logg = logging.getLogger(__name__) class BlockchainSyncFilter(SessionBase): + """Sync filter sql backend database interface. + + :param chain_sync: BlockchainSync object to use as context for filter + :type chain_sync: chainsyncer.db.models.sync.BlockchainSync + :param count: Number of filters to track + :type count: int + :param flags: Filter flag value to instantiate record with + :type flags: int + :param digest: Filter digest as integrity protection when resuming session, 256 bits, in hex + :type digest: str + """ __tablename__ = 'chain_sync_filter' @@ -31,7 +42,7 @@ class BlockchainSyncFilter(SessionBase): if flags == None: flags = bytearray(0) - else: # TODO: handle bytes too + else: bytecount = int((count - 1) / 8 + 1) flags = flags.to_bytes(bytecount, 'big') self.flags_start = flags @@ -41,6 +52,13 @@ class BlockchainSyncFilter(SessionBase): def add(self, name): + """Add a new filter to the syncer record. + + The name of the filter is hashed with the current aggregated hash sum of previously added filters. + + :param name: Filter informal name + :type name: str + """ h = hashlib.new('sha256') h.update(bytes.fromhex(self.digest)) h.update(name.encode('utf-8')) @@ -56,14 +74,32 @@ class BlockchainSyncFilter(SessionBase): def start(self): + """Retrieve the initial filter state of the syncer. + + :rtype: tuple + :returns: Filter flag value, filter count, filter digest + """ return (int.from_bytes(self.flags_start, 'big'), self.count, self.digest) def cursor(self): + """Retrieve the current filter state of the syncer. + + :rtype: tuple + :returns: Filter flag value, filter count, filter digest + """ return (int.from_bytes(self.flags, 'big'), self.count, self.digest) def target(self): + """Retrieve the target filter state of the syncer. + + The target filter value will be the integer value when all bits are set for the filter count. + + :rtype: tuple + :returns: Filter flag value, filter count, filter digest + """ + n = 0 for i in range(self.count): n |= (1 << self.count) - 1 @@ -71,10 +107,19 @@ class BlockchainSyncFilter(SessionBase): def clear(self): + """Set current filter flag value to zero. + """ self.flags = bytearray(len(self.flags)) def set(self, n): + """Set the filter flag at given index. + + :param n: Filter flag index + :type n: int + :raises IndexError: Invalid flag index + :raises AttributeError: Flag at index already set + """ if n > self.count: raise IndexError('bit flag out of range') diff --git a/chainsyncer/db/models/sync.py b/chainsyncer/db/models/sync.py @@ -12,8 +12,8 @@ from .base import SessionBase class BlockchainSync(SessionBase): """Syncer control backend. - :param chain: Chain spec string representation - :type chain: str + :param chain_str: Chain spec string representation + :type chain_str: str :param block_start: Block number to start sync from :type block_start: number :param tx_start: Block transaction number to start sync from @@ -24,30 +24,48 @@ class BlockchainSync(SessionBase): __tablename__ = 'chain_sync' blockchain = Column(String) + """Chainspec string specifying the blockchain the syncer is running against.""" block_start = Column(Integer) + """The block height at the start of syncer.""" tx_start = Column(Integer) + """The transaction index at the start of syncer.""" block_cursor = Column(Integer) + """The block height for the current state of the syncer.""" tx_cursor = Column(Integer) + """The transaction index for the current state of the syncer.""" block_target = Column(Integer) + """The block height at which the syncer should terminate. Will be None for an open-ended syncer.""" date_created = Column(DateTime, default=datetime.datetime.utcnow) + """Datetime when syncer was first created.""" date_updated = Column(DateTime) + """Datetime of the latest update of the syncer state.""" + + def __init__(self, chain_str, block_start, tx_start, block_target=None): + self.blockchain = chain_str + self.block_start = block_start + self.tx_start = tx_start + self.block_cursor = block_start + self.tx_cursor = tx_start + self.block_target = block_target + self.date_created = datetime.datetime.utcnow() + self.date_updated = datetime.datetime.utcnow() @staticmethod - def first(chain, session=None): + def first(chain_str, session=None): """Check if a sync session for the specified chain already exists. - :param chain: Chain spec string representation - :type chain: str + :param chain_str: Chain spec string representation + :type chain_str: str :param session: Session to use. If not specified, a separate session will be created for this method only. - :type session: SqlAlchemy Session - :returns: Database primary key id of sync record - :rtype: number|None + :type session: sqlalchemy.orm.session.Sessoin + :returns: Database primary key id of sync record, or None if insert failed + :rtype: number """ session = SessionBase.bind_session(session) q = session.query(BlockchainSync.id) - q = q.filter(BlockchainSync.blockchain==chain) + q = q.filter(BlockchainSync.blockchain==chain_str) o = q.first() if o == None: @@ -63,12 +81,16 @@ class BlockchainSync(SessionBase): @staticmethod def get_last(session=None, live=True): - """Get the most recent open-ended ("live") syncer record. + """Get the most recent syncer record. + + If live is set, only the latest open-ended syncer will be returned. :param session: Session to use. If not specified, a separate session will be created for this method only. :type session: SqlAlchemy Session - :returns: Block and transaction number, respectively - :rtype: tuple + :param live: Match only open-ended syncers + :type live: bool + :returns: Syncer database id + :rtype: int """ session = SessionBase.bind_session(session) @@ -95,7 +117,7 @@ class BlockchainSync(SessionBase): :param session: Session to use. If not specified, a separate session will be created for this method only. :type session: SqlAlchemy Session :returns: Syncer database ids - :rtype: tuple, where first element is id + :rtype: list """ unsynced = [] local_session = False @@ -115,7 +137,7 @@ class BlockchainSync(SessionBase): def set(self, block_height, tx_height): - """Set the height of the syncer instance. + """Set the cursor height of the syncer instance. Only manipulates object, does not transaction or commit to backend. @@ -123,6 +145,8 @@ class BlockchainSync(SessionBase): :type block_height: number :param tx_height: Block transaction number :type tx_height: number + :rtype: tuple + :returns: Stored block height, transaction index """ self.block_cursor = block_height self.tx_cursor = tx_height @@ -133,7 +157,7 @@ class BlockchainSync(SessionBase): def cursor(self): """Get current state of cursor from cached instance. - :returns: Block and transaction height, respectively + :returns: Block height, transaction index :rtype: tuple """ return (self.block_cursor, self.tx_cursor) @@ -142,7 +166,7 @@ class BlockchainSync(SessionBase): def start(self): """Get sync block start position from cached instance. - :returns: Block and transaction height, respectively + :returns: Block height, transaction index :rtype: tuple """ return (self.block_start, self.tx_start) @@ -151,29 +175,18 @@ class BlockchainSync(SessionBase): def target(self): """Get sync block upper bound from cached instance. - :returns: Block number - :rtype: number, or None if sync is open-ended + :returns: Block number. Returns None if syncer is open-ended. + :rtype: int """ return self.block_target def chain(self): - """Get chain the cached instance represents. + """Get chain string representation for which the cached instance represents. """ return self.blockchain - def __init__(self, chain, block_start, tx_start, block_target=None): - self.blockchain = chain - self.block_start = block_start - self.tx_start = tx_start - self.block_cursor = block_start - self.tx_cursor = tx_start - self.block_target = block_target - self.date_created = datetime.datetime.utcnow() - self.date_updated = datetime.datetime.utcnow() - - def __str__(self): return """object_id: {} start: {}:{} @@ -187,5 +200,3 @@ target: {} self.tx_cursor, self.block_target, ) - -