commit 81c1207828ae6e62c0e17d7b80da63bf921bdbb4
parent f33ba13d74b2ab13c07568db2df84bc91f28c7a7
Author: lash <dev@holbrook.no>
Date: Thu, 5 May 2022 12:09:07 +0000
Recreate adapter only per block
Diffstat:
3 files changed, 54 insertions(+), 23 deletions(-)
diff --git a/CHANGELOG b/CHANGELOG
@@ -1,3 +1,5 @@
+- 0.2.9
+ * Minimize instantiations of adapters in filter execution
- 0.2.8
* Upgrade chainsyncer
- 0.2.7
diff --git a/chaind/filter.py b/chaind/filter.py
@@ -26,43 +26,69 @@ class StateFilter(SyncFilter):
self.adapter_path = adapter_path
self.tx_adapter = tx_adapter
self.throttler = throttler
+ self.last_block_height = 0
+ self.adapter = None
+ self.store_lock = None
+
+
+ def __get_adapter(self, block, force_reload=False):
+ if self.store_lock == None:
+ self.store_lock = StoreLock()
+
+ reload = False
+ if block.number != self.last_block_height:
+ reload = True
+ elif self.adapter == None:
+ reload = True
+ elif force_reload:
+ reload = True
+
+ self.last_block_height = block.number
+
+ if reload:
+ while True:
+ logg.info('reloading adapter')
+ try:
+ self.adapter = ChaindFsAdapter(
+ self.chain_spec,
+ self.adapter_path,
+ self.tx_adapter,
+ None,
+ )
+ break
+ except BackendError as e:
+ logg.error('adapter instantiation failed: {}, one more try'.format(e))
+ self.store_lock.again()
+ continue
+
+ return self.adapter
def filter(self, conn, block, tx, session=None):
cache_tx = None
- store_lock = StoreLock()
- queue_adapter = None
+ queue_adapter = self.__get_adapter(block)
+
+ self.store_lock.reset()
+
while True:
try:
- queue_adapter = ChaindFsAdapter(
- self.chain_spec,
- self.adapter_path,
- self.tx_adapter,
- None,
- )
- except BackendError as e:
- logg.error('adapter instantiation failed: {}, one more try'.format(e))
- store_lock.again()
- continue
-
- store_lock.reset()
-
- try:
cache_tx = queue_adapter.get(tx.hash)
break
except NotLocalTxError:
logg.debug('skipping not local transaction {}'.format(tx.hash))
+ self.__stop_adapter()
return False
except BackendError as e:
logg.error('adapter get failed: {}, one more try'.format(e))
- queue_adapter = None
- store_lock.again()
+ self.store_lock.again()
+ queue_adapter = self.__get_adapter(block, force_reload=True)
continue
if cache_tx == None:
raise NotLocalTxError(tx.hash)
- store_lock = StoreLock()
+ self.store_lock.reset()
+
queue_lock = StoreLock(error=QueueLockError)
while True:
try:
@@ -76,15 +102,18 @@ class StateFilter(SyncFilter):
queue_lock.again()
except FileNotFoundError as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
- store_lock.again()
+ self.store_lock.again()
+ queue_adapter = self.__get_adapter(block, force_reload=True)
continue
except NotLocalTxError as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
- store_lock.again()
+ self.store_lock.again()
+ queue_adapter = self.__get_adapter(block, force_reload=True)
continue
except StateLockedKey as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
- store_lock.again()
+ self.store_lock.again()
+ queue_adapter = self.__get_adapter(block, force_reload=True)
continue
logg.info('filter registered {} for {} in {}'.format(tx.status.name, tx.hash, block))
diff --git a/setup.cfg b/setup.cfg
@@ -1,6 +1,6 @@
[metadata]
name = chaind
-version = 0.2.8
+version = 0.2.9
description = Base package for chain queue service
author = Louis Holbrook
author_email = dev@holbrook.no