commit c94b291d394432746279b40e2822be2c3d9d39e2
parent 6c360ca2e5c94bf52355db5b6047e83bd7a95848
Author: lash <dev@holbrook.no>
Date: Sat, 30 Apr 2022 05:42:32 +0000
Add upcoming tests, event callback pass to shep
Diffstat:
4 files changed, 65 insertions(+), 4 deletions(-)
diff --git a/chainqueue/state.py b/chainqueue/state.py
@@ -106,10 +106,10 @@ class Verify:
class Status(shep.persist.PersistedState):
- def __init__(self, store_factory):
+ def __init__(self, store_factory, allow_invalid=False, event_callback=None):
verify = Verify().verify
self.set_default_state('PENDING')
- super(Status, self).__init__(store_factory, 12, verifier=verify)
+ super(Status, self).__init__(store_factory, 12, verifier=verify, check_alias=not allow_invalid, event_callback=event_callback)
self.add('QUEUED')
self.add('RESERVED')
self.add('IN_NETWORK')
diff --git a/chainqueue/store/base.py b/chainqueue/store/base.py
@@ -9,6 +9,10 @@ from chainqueue.entry import QueueEntry
from chainqueue.error import (
NotLocalTxError,
)
+from chainqueue.enum import (
+ StatusBits,
+ all_errors,
+ )
logg = logging.getLogger(__name__)
@@ -21,6 +25,7 @@ def from_key(k):
(ts_str, seq_str, tx_hash) = k.split('_')
return (float(ts_str), int(seq_str), tx_hash, )
+all_local_errors = all_errors() - StatusBits.NETWORK_ERROR
re_u = r'^[^_][_A-Z]+$'
class Store:
@@ -94,7 +99,9 @@ class Store:
hashes.append(hsh)
-
+ i += 1
+ if limit > 0 and i == limit:
+ break
hashes.sort()
return hashes
@@ -108,6 +115,17 @@ class Store:
return self.by_state(state=self.DEFERRED, limit=limit, threshold=threshold)
+ def failed(self, limit=4096):
+ #return self.by_state(state=all_local_errors, limit=limit)
+ r = []
+ r += self.by_state(state=self.LOCAL_ERROR, limit=limit)
+ r += self.by_state(state=self.NODE_ERROR, limit=limit)
+ r.sort()
+ if len(r) > limit:
+ r = r[:limit]
+ return r
+
+
def pending(self, limit=4096):
return self.by_state(state=0, limit=limit, strict=True)
@@ -130,6 +148,7 @@ class Store:
def fail(self, k):
entry = QueueEntry(self, k)
entry.load()
+ logg.debug('fail {}'.format(k))
entry.sendfail()
diff --git a/setup.cfg b/setup.cfg
@@ -1,6 +1,6 @@
[metadata]
name = chainqueue
-version = 0.1.5
+version = 0.1.6
description = Generic blockchain transaction queue control
author = Louis Holbrook
author_email = dev@holbrook.no
diff --git a/tests/test_store.py b/tests/test_store.py
@@ -6,14 +6,23 @@ import logging
import shutil
# external imports
+from chainlib.chain import ChainSpec
+from shep.store.noop import NoopStoreFactory
# local imports
from chainqueue.store.fs import (
IndexStore,
CounterStore,
)
+from chainqueue.store.base import Store
from chainqueue.error import DuplicateTxError
+from chainqueue.state import Status
+# tests imports
+from tests.common import (
+ MockTokenCache,
+ MockCacheTokenTx,
+ )
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
@@ -58,5 +67,38 @@ class TestStoreImplementations(unittest.TestCase):
store.put(hx, data)
+ def test_upcoming_limit(self):
+ index_store = IndexStore(self.path)
+ counter_store = CounterStore(self.path)
+ chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
+ factory = NoopStoreFactory().add
+ state_store = Status(factory)
+ cache_store = MockTokenCache()
+ queue_store = Store(chain_spec, state_store, index_store, counter_store, cache=cache_store)
+
+ txs = []
+ for i in range(3):
+ tx_src = os.urandom(128).hex()
+ tx = queue_store.put(tx_src, cache_adapter=MockCacheTokenTx)
+ txs.append(tx)
+
+ r = queue_store.upcoming(limit=3)
+ self.assertEqual(len(r), 0)
+
+ for tx in txs:
+ queue_store.enqueue(tx[1])
+
+ r = queue_store.upcoming(limit=3)
+ self.assertEqual(len(r), 3)
+
+ queue_store.send_start(txs[0][1])
+ r = queue_store.upcoming(limit=3)
+ self.assertEqual(len(r), 2)
+
+ queue_store.send_end(txs[0][1])
+ r = queue_store.upcoming(limit=3)
+ self.assertEqual(len(r), 2)
+
+
if __name__ == '__main__':
unittest.main()