commit 4be5325df217b5e664b5da323a4540cfcedb3666
parent 96bdca20ccd10a46740f8b3664576181806f7c40
Author: lash <dev@holbrook.no>
Date: Sat, 30 Apr 2022 18:32:10 +0000
Renew backend every filter pass, allow race in backend
Diffstat:
3 files changed, 52 insertions(+), 15 deletions(-)
diff --git a/chaind/adapters/fs.py b/chaind/adapters/fs.py
@@ -10,6 +10,7 @@ from chainqueue.store.fs import (
IndexStore,
CounterStore,
)
+from chainqueue.error import BackendIntegrityError
from shep.store.file import SimpleFileStoreFactory
from shep.error import StateInvalid
@@ -42,16 +43,24 @@ class ChaindFsAdapter(ChaindAdapter):
except StateInvalid as e:
logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e))
return None
+ except FileNotFoundError:
+ pass
+ if v ==None:
+ raise BackendIntegrityError(tx_hash)
return v[1]
def upcoming(self, limit=0):
+ real_limit = 0
+ in_flight = 0
if limit > 0:
- r = self.store.by_state(state=self.store.IN_NETWORK, not_state=self.store.FINAL)
- limit -= len(r)
- if limit <= 0:
+ in_flight = self.store.by_state(state=self.store.IN_NETWORK, not_state=self.store.FINAL)
+ real_limit = limit - len(in_flight)
+ if real_limit <= 0:
return []
- return self.store.upcoming(limit=limit)
+ r = self.store.upcoming(limit=real_limit)
+ logg.info('upcoming returning {} upcoming from limit {} less {} active in-flight txs'.format(len(r), limit, len(in_flight)))
+ return r
def pending(self):
diff --git a/chaind/filter.py b/chaind/filter.py
@@ -5,28 +5,57 @@ import time
# external imports
from chainlib.status import Status as TxStatus
from chainsyncer.filter import SyncFilter
-from chainqueue.error import NotLocalTxError
+from chainqueue.error import (
+ NotLocalTxError,
+ BackendIntegrityError,
+ )
+from chaind.adapters.fs import ChaindFsAdapter
# local imports
from .error import QueueLockError
logg = logging.getLogger(__name__)
+
class StateFilter(SyncFilter):
delay_limit = 3.0
- def __init__(self, adapter, throttler=None):
- self.adapter = adapter
+ def __init__(self, chain_spec, adapter_path, tx_adapter, throttler=None):
+ self.chain_spec = chain_spec
+ self.adapter_path = adapter_path
+ self.tx_adapter = tx_adapter
self.throttler = throttler
def filter(self, conn, block, tx, session=None):
- try:
- cache_tx = self.adapter.get(tx.hash)
- except NotLocalTxError:
- logg.debug('skipping not local transaction {}'.format(tx.hash))
- return False
+ cache_tx = None
+ for i in range(3):
+ queue_adapter = None
+ try:
+ queue_adapter = ChaindFsAdapter(
+ self.chain_spec,
+ self.adapter_path,
+ self.tx_adapter,
+ None,
+ )
+ except BackendIntegrityError as e:
+ logg.error('adapter instantiation failed: {}, one more try'.format(e))
+ continue
+
+ try:
+ cache_tx = queue_adapter.get(tx.hash)
+ except NotLocalTxError:
+ logg.debug('skipping not local transaction {}'.format(tx.hash))
+ return False
+ except BackendIntegrityError as e:
+ logg.error('adapter instantiation failed: {}, one more try'.format(e))
+ continue
+
+ break
+
+ if cache_tx == None:
+ raise NotLocalTxError(tx.hash)
delay = 0.01
while True:
@@ -34,9 +63,9 @@ class StateFilter(SyncFilter):
raise QueueLockError('The queue lock for tx {} seems to be stuck. Human meddling needed.'.format(tx.hash))
try:
if tx.status == TxStatus.SUCCESS:
- self.adapter.succeed(block, tx)
+ queue_adapter.succeed(block, tx)
else:
- self.adapter.fail(block, tx)
+ queue_adapter.fail(block, tx)
break
except QueueLockError as e:
logg.debug('queue item {} is blocked, will retry: {}'.format(tx.hash, e))
diff --git a/chaind/session.py b/chaind/session.py
@@ -104,7 +104,6 @@ class SessionController:
logg.error('invalid input "{}"'.format(data_in_str))
raise ClientInputError()
- logg.info('recv {} bytes'.format(len(data)))
return (srvs, data,)