chaind

Base package for chain queue serviceBase package for chain queue service
Log | Files | Refs | LICENSE

commit e87ec0cd4c18a4b196988ec76cdf0bc585545bf3
parent b52d69c3f9e6884ce36c21e2da494b30d42a3b78
Author: lash <dev@holbrook.no>
Date:   Fri, 29 Apr 2022 06:26:43 +0000

Upgrade chainqueue, chainsyncer

Diffstat:
Mchaind/adapters/fs.py | 3+++
Mchaind/error.py | 4++++
Mchaind/filter.py | 25+++++++++++++++++++++----
Mrequirements.txt | 4++--
Msetup.cfg | 2+-
5 files changed, 31 insertions(+), 7 deletions(-)

diff --git a/chaind/adapters/fs.py b/chaind/adapters/fs.py @@ -58,6 +58,9 @@ class ChaindFsAdapter(ChaindAdapter): def succeed(self, block, tx): + if self.store.is_reserved(tx.hash): + raise QueueLockError(tx.hash) + return self.store.final(tx.hash, block, tx, error=False) diff --git a/chaind/error.py b/chaind/error.py @@ -16,3 +16,7 @@ class ClientBlockError(BlockingIOError): class ClientInputError(ValueError): pass + + +class QueueLockError(Exception): + pass diff --git a/chaind/filter.py b/chaind/filter.py @@ -1,15 +1,21 @@ # standard imports import logging +import time # external imports from chainlib.status import Status as TxStatus from chainsyncer.filter import SyncFilter from chainqueue.error import NotLocalTxError +# local imports +from .error import QueueLockError + logg = logging.getLogger(__name__) class StateFilter(SyncFilter): + delay_limit = 3.0 + def __init__(self, adapter, throttler=None): self.adapter = adapter self.throttler = throttler @@ -22,10 +28,21 @@ class StateFilter(SyncFilter): logg.debug('skipping not local transaction {}'.format(tx.hash)) return False - if tx.status == TxStatus.SUCCESS: - self.adapter.succeed(block, tx) - else: - self.adapter.fail(block, tx) + delay = 0.01 + while True: + if delay > self.delay_limit: + raise QueueLockError('The queue lock for tx {} seems to be stuck. Human meddling needed.'.format(tx.hash)) + try: + if tx.status == TxStatus.SUCCESS: + self.adapter.succeed(block, tx) + else: + self.adapter.fail(block, tx) + break + except QueueLockError as e: + logg.debug('queue item {} is blocked, will retry: {}'.format(tx.hash, e)) + time.sleep(delay) + delay *= 2 + if self.throttler != None: self.throttler.dec(tx.hash) diff --git a/requirements.txt b/requirements.txt @@ -1,6 +1,6 @@ chainlib~=0.1.1 -chainqueue~=0.1.2 -chainsyncer~=0.4.0 +chainqueue~=0.1.5 +chainsyncer~=0.4.2 confini~=0.6.0 funga~=0.5.2 pyxdg~=0.26 diff --git a/setup.cfg b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chaind -version = 0.1.2 +version = 0.1.3 description = Base package for chain queue service author = Louis Holbrook author_email = dev@holbrook.no