chaind

Base package for chain queue serviceBase package for chain queue service
Log | Files | Refs | LICENSE

commit 9ed3bad0c434d33fb07e86b274c9d49d6817df41
parent e87ec0cd4c18a4b196988ec76cdf0bc585545bf3
Author: lash <dev@holbrook.no>
Date:   Sat, 30 Apr 2022 05:45:02 +0000

Add upcoming throttling, tests

Diffstat:
Mchaind/adapters/fs.py | 21+++++++++++++++++----
Achaind/data/config/token.ini | 2++
Mchaind/unittest/common.py | 17+++--------------
Achaind/unittest/fs.py | 31+++++++++++++++++++++++++++++++
Mrequirements.txt | 2+-
Msetup.cfg | 2+-
Mtests/test_fs.py | 36+++++++++++++++++++++++++++++-------
7 files changed, 84 insertions(+), 27 deletions(-)

diff --git a/chaind/adapters/fs.py b/chaind/adapters/fs.py @@ -21,9 +21,9 @@ logg = logging.getLogger(__name__) class ChaindFsAdapter(ChaindAdapter): - def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32): + def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32, event_callback=None): factory = SimpleFileStoreFactory(path).add - state_store = Status(factory) + state_store = Status(factory, allow_invalid=True, event_callback=event_callback) index_path = os.path.join(path, 'tx') index_store = IndexStore(index_path, digest_bytes=digest_bytes) counter_store = CounterStore(path) @@ -45,8 +45,13 @@ class ChaindFsAdapter(ChaindAdapter): return v[1] - def upcoming(self): - return self.store.upcoming() + def upcoming(self, limit=0): + if limit > 0: + r = self.store.by_state(self.store.IN_NETWORK) + limit -= len(r) + if limit <= 0: + return [] + return self.store.upcoming(limit=limit) def pending(self): @@ -57,6 +62,10 @@ class ChaindFsAdapter(ChaindAdapter): return self.store.deferred() + def failed(self): + return self.store.failed() + + def succeed(self, block, tx): if self.store.is_reserved(tx.hash): raise QueueLockError(tx.hash) @@ -68,6 +77,10 @@ class ChaindFsAdapter(ChaindAdapter): return self.store.final(tx.hash, block, tx, error=True) + def sendfail(self): + return self.store.fail(tx.hash) + + def enqueue(self, tx_hash): return self.store.enqueue(tx_hash) diff --git a/chaind/data/config/token.ini b/chaind/data/config/token.ini @@ -0,0 +1,2 @@ +[token] +module = diff --git a/chaind/unittest/common.py b/chaind/unittest/common.py @@ -1,5 +1,4 @@ # standard imports -import unittest import hashlib import tempfile @@ -9,9 +8,6 @@ from chainlib.status import Status as TxStatus from chainlib.chain import ChainSpec from chainlib.error import RPCException -# local imports -from chaind.adapters.fs import ChaindFsAdapter - class MockCacheAdapter(CacheTokenTx): @@ -33,7 +29,9 @@ class MockDispatcher: def send(self, v): - if v not in self.fails: + import sys + sys.stderr.write('susu v {} {}\n'.format(v, self.fails)) + if v in self.fails: raise RPCException('{} is in fails'.format(v)) pass @@ -43,12 +41,3 @@ class MockTx: def __init__(self, tx_hash, status=TxStatus.SUCCESS): self.hash = tx_hash self.status = status - - -class TestChaindFsBase(unittest.TestCase): - - def setUp(self): - self.chain_spec = ChainSpec('foo', 'bar', 42, 'baz') - self.path = tempfile.mkdtemp() - self.adapter = ChaindFsAdapter(self.chain_spec, self.path, self.cache_adapter, self.dispatcher) - diff --git a/chaind/unittest/fs.py b/chaind/unittest/fs.py @@ -0,0 +1,31 @@ +# standard imports +import unittest +import tempfile +import logging + +# external imports +from chainlib.chain import ChainSpec + +# local imports +from chaind.adapters.fs import ChaindFsAdapter + +logging.STATETRACE = 5 +logg = logging.getLogger(__name__) +logg.setLevel(logging.STATETRACE) + + +class TestChaindFsBase(unittest.TestCase): + + def setUp(self): + self.chain_spec = ChainSpec('foo', 'bar', 42, 'baz') + self.path = tempfile.mkdtemp() + self.adapter = ChaindFsAdapter(self.chain_spec, self.path, self.cache_adapter, self.dispatcher, event_callback=self.log_state) + + + def log_state(self, k, from_state, to_state): + logg.log(logging.STATETRACE, 'state change {}: {} -> {}'.format( + k, + from_state, + to_state, + ) + ) diff --git a/requirements.txt b/requirements.txt @@ -1,5 +1,5 @@ chainlib~=0.1.1 -chainqueue~=0.1.5 +chainqueue~=0.1.6 chainsyncer~=0.4.2 confini~=0.6.0 funga~=0.5.2 diff --git a/setup.cfg b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chaind -version = 0.1.3 +version = 0.2.0 description = Base package for chain queue service author = Louis Holbrook author_email = dev@holbrook.no diff --git a/tests/test_fs.py b/tests/test_fs.py @@ -15,15 +15,14 @@ from chaind.filter import StateFilter from chaind.unittest.common import ( MockTx, MockCacheAdapter, - TestChaindFsBase, + MockDispatcher, ) - +from chaind.unittest.fs import TestChaindFsBase logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() - class TestChaindFs(TestChaindFsBase): def setUp(self): @@ -43,12 +42,15 @@ class TestChaindFs(TestChaindFsBase): self.assertEqual(data, v) - def test_fs_defer(self): + def test_fs_fail(self): data = os.urandom(128).hex() hsh = self.adapter.put(data) - self.dispatcher.add_fail(hsh) - self.adapter.dispatch(hsh) - txs = self.adapter.deferred() + self.dispatcher.add_fail(data) + + r = self.adapter.dispatch(hsh) + self.assertFalse(r) + + txs = self.adapter.failed() self.assertEqual(len(txs), 1) @@ -88,5 +90,25 @@ class TestChaindFs(TestChaindFsBase): fltr.filter(None, None, tx) + def test_upcoming(self): + drv = QueueDriver(self.adapter) + + txs = [] + for i in range(10): + data = os.urandom(128).hex() + hsh = self.adapter.put(data) + txs.append(hsh) + self.adapter.enqueue(hsh) + + r = self.adapter.upcoming(limit=5) + self.assertEqual(len(r), 5) + + r = self.adapter.dispatch(txs[0]) + self.assertTrue(r) + + r = self.adapter.upcoming(limit=5) + self.assertEqual(len(r), 4) + + if __name__ == '__main__': unittest.main()