chainqueue

Blockchain transaction queue control
Log | Files | Refs | LICENSE

commit 0c9b42d0866e4e3a0230e6518b9bf0fd783f0c11
parent 69ad3711cde9e3d8167f9111586a6d63a6018ae0
Author: lash <dev@holbrook.no>
Date:   Sat, 12 Mar 2022 13:48:40 +0000

Prepare integration test

Diffstat:
Dchainqueue/adapters/base.py | 117-------------------------------------------------------------------------------
Mchainqueue/store.py | 19+++++++++++++++----
Mtests/base_shep.py | 20+++++++-------------
Atests/common.py | 40++++++++++++++++++++++++++++++++++++++++
Mtests/test_cache.py | 28+---------------------------
Mtests/test_entry.py | 12++++++------
Atests/test_integrate.py | 54++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtests/test_shep.py | 6+++++-
8 files changed, 128 insertions(+), 168 deletions(-)

diff --git a/chainqueue/adapters/base.py b/chainqueue/adapters/base.py @@ -1,117 +0,0 @@ -# standard imports -import datetime - -# local imports -from chainqueue.enum import StatusBits - - -class Adapter: - """Base class defining interface to be implemented by chainqueue adapters. - - The chainqueue adapter collects the following actions: - - - add: add a transaction to the queue - - upcoming: get queued transactions ready to be sent to network - - dispatch: send a queued transaction to the network - - translate: decode details of a transaction - - create_session, release_session: session management to control queue state integrity - - :param backend: Chainqueue backend - :type backend: TODO - abstract backend class. Must implement get, create_session, release_session - :param pending_retry_threshold: seconds delay before retrying a transaction stalled in the newtork - :type pending_retry_threshold: int - :param error_retry_threshold: seconds delay before retrying a transaction that incurred a recoverable error state - :type error_retry_threshold: int - """ - - def __init__(self, backend, pending_retry_threshold=0, error_retry_threshold=0): - self.backend = backend - self.pending_retry_threshold = datetime.timedelta(pending_retry_threshold) - self.error_retry_threshold = datetime.timedelta(error_retry_threshold) - - - def add(self, bytecode, chain_spec, session=None): - """Add a transaction to the queue. - - :param bytecode: Transaction wire format bytecode, in hex - :type bytecode: str - :param chain_spec: Chain spec to use for transaction decode - :type chain_spec: chainlib.chain.ChainSpec - :param session: Backend state integrity session - :type session: varies - """ - raise NotImplementedError() - - - def translate(self, bytecode, chain_spec): - """Decode details of a transaction. - - :param bytecode: Transaction wire format bytecode, in hex - :type bytecode: str - :param chain_spec: Chain spec to use for transaction decode - :type chain_spec: chainlib.chain.ChainSpec - """ - raise NotImplementedError() - - - def get(self, tx_hash, chain_spec, session=None): - """Retrieve serialized transaction represented by the given transaction hash. - - :param chain_spec: Chain spec to use for transaction decode - :type chain_spec: chainlib.chain.ChainSpec - :param tx_hash: Transaction hash, in hex - :type tx_hash: str - :param session: Backend state integrity session - :type session: varies - """ - raise NotImplementedError() - - - def dispatch(self, chain_spec, rpc, tx_hash, signed_tx, session=None): - """Send a queued transaction to the network. - - :param chain_spec: Chain spec to use to identify the transaction network - :type chain_spec: chainlib.chain.ChainSpec - :param rpc: RPC connection to use for transaction send - :type rpc: chainlib.connection.RPCConnection - :param tx_hash: Transaction hash (checksum of transaction), in hex - :type tx_hash: str - :param signed_tx: Transaction wire format bytecode, in hex - :type signed_tx: str - :param session: Backend state integrity session - :type session: varies - """ - raise NotImplementedError() - - - def upcoming(self, chain_spec, session=None): - """Get queued transactions ready to be sent to the network. - - The transactions will be a combination of newly submitted transactions, previously sent but stalled transactions, and transactions that could temporarily not be submitted. - - :param chain_spec: Chain spec to use to identify the transaction network - :type chain_spec: chainlib.chain.ChainSpec - :param session: Backend state integrity session - :type session: varies - """ - raise NotImplementedError() - - - def create_session(self, session=None): - """Create a session context to guarantee atomic state change in backend. - - :param session: If specified, session will be used instead of creating a new one - :type session: varies - """ - return self.backend.create_session(session) - - - def release_session(self, session=None): - """Release a session context created by create_session. - - If session parameter is defined, final session destruction will be deferred to the initial provider of the session. In other words; if create_session was called with a session, release_session should symmetrically be called with the same session. - - :param session: Session context. - :type session: varies - """ - return self.backend.release_session(session) diff --git a/chainqueue/store.py b/chainqueue/store.py @@ -3,6 +3,10 @@ import logging import re import datetime +# local imports +from chainqueue.cache import CacheTx + + logg = logging.getLogger(__name__) @@ -18,7 +22,8 @@ def from_key(k): re_u = r'^[^_][_A-Z]+$' class Store: - def __init__(self, state_store, index_store, counter, cache=None): + def __init__(self, chain_spec, state_store, index_store, counter, cache=None): + self.chain_spec = chain_spec self.cache = cache self.state_store = state_store self.index_store = index_store @@ -32,14 +37,16 @@ class Store: setattr(self, v, getattr(self.state_store, v)) - def put(self, k, v): + def put(self, k, v, cache_adapter=CacheTx()): n = self.counter.next() t = datetime.datetime.now().timestamp() s = to_key(t, n, k) self.state_store.put(s, v) self.index_store.put(k, s) if self.cache != None: - self.cache.put_serialized(v) + tx = cache_adapter() + tx.deserialize(v) + self.cache.put(self.chain_spec, tx) def get(self, k): @@ -48,7 +55,7 @@ class Store: return (s, v,) - def list(self, state=0, limit=4096, strict=False): + def by_state(self, state=0, limit=4096, strict=False): hashes = [] i = 0 @@ -68,3 +75,7 @@ class Store: pair = from_key(h) hashes_out.append(pair[1]) return hashes_out + + + def upcoming(self, limit=4096): + return self.by_state(state=self.QUEUED, limit=limit) diff --git a/tests/base_shep.py b/tests/base_shep.py @@ -4,6 +4,7 @@ import unittest # external imports from shep.store.file import SimpleFileStoreFactory +from chainlib.chain import ChainSpec # local imports from chainqueue import ( @@ -11,6 +12,10 @@ from chainqueue import ( Status, ) +# test imports +from tests.common import MockCounter + + class MockContentStore: @@ -26,18 +31,6 @@ class MockContentStore: return self.store.get(k) -class MockCounter: - - def __init__(self): - self.c = 0 - - - def next(self): - c = self.c - self.c += 1 - return c - - class TestShepBase(unittest.TestCase): def setUp(self): @@ -46,4 +39,5 @@ class TestShepBase(unittest.TestCase): self.state = Status(factory) content_store = MockContentStore() counter = MockCounter() - self.store = Store(self.state, content_store, counter) + chain_spec = ChainSpec('foo', 'bar', 42, 'baz') + self.store = Store(chain_spec, self.state, content_store, counter) diff --git a/tests/common.py b/tests/common.py @@ -0,0 +1,40 @@ +# local imports +from chainqueue.cache import Cache + + +class MockCounter: + + def __init__(self): + self.c = 0 + + + def next(self): + c = self.c + self.c += 1 + return c + + +class MockTokenCache(Cache): + + def __init__(self): + self.db = {} + self.last_filter = None + + def put(self, chain_spec, cache_tx): + self.db[cache_tx.tx_hash] = cache_tx + + + def get(self, chain_spec, tx_hash): + return self.db[tx_hash] + + + def by_nonce(self, cache_filter): + self.last_filter = cache_filter + + + def by_date(self, cache_filter=None): + self.last_filter = cache_filter + + + def count(self, cache_filter): + self.last_filter = cache_filter diff --git a/tests/test_cache.py b/tests/test_cache.py @@ -13,12 +13,12 @@ from chainlib.chain import ChainSpec from chainqueue import QueueEntry from chainqueue.cache import ( CacheTokenTx, - Cache, CacheFilter, ) # test imports from tests.base_shep import TestShepBase +from tests.common import MockTokenCache logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() @@ -65,32 +65,6 @@ class MockCacheTokenTx(CacheTokenTx): return self -class MockTokenCache(Cache): - - def __init__(self): - self.db = {} - self.last_filter = None - - def put(self, chain_spec, cache_tx): - self.db[cache_tx.tx_hash] = cache_tx - - - def get(self, chain_spec, tx_hash): - return self.db[tx_hash] - - - def by_nonce(self, cache_filter): - self.last_filter = cache_filter - - - def by_date(self, cache_filter=None): - self.last_filter = cache_filter - - - def count(self, cache_filter): - self.last_filter = cache_filter - - class MockNormalizer: def address(self, v): diff --git a/tests/test_entry.py b/tests/test_entry.py @@ -31,31 +31,31 @@ class TestEntry(TestShepBase): entry = QueueEntry(self.store, tx_hash_two) entry.create(signed_tx) - txs = self.store.list() + txs = self.store.by_state() self.assertEqual(len(txs), 2) entry = QueueEntry(self.store, tx_hash_one) entry.load() entry.sent() - txs = self.store.list() + txs = self.store.by_state() self.assertEqual(len(txs), 1) - txs = self.store.list(state=self.store.IN_NETWORK) + txs = self.store.by_state(state=self.store.IN_NETWORK) self.assertEqual(len(txs), 1) entry.succeed(0) - txs = self.store.list() + txs = self.store.by_state() self.assertEqual(len(txs), 1) entry = QueueEntry(self.store, tx_hash_two) entry.load() entry.sent() - txs = self.store.list(state=self.store.IN_NETWORK) + txs = self.store.by_state(state=self.store.IN_NETWORK) self.assertEqual(len(txs), 2) - txs = self.store.list(state=self.store.IN_NETWORK, strict=True) + txs = self.store.by_state(state=self.store.IN_NETWORK, strict=True) self.assertEqual(len(txs), 1) diff --git a/tests/test_integrate.py b/tests/test_integrate.py @@ -0,0 +1,54 @@ +# standard imports +import tempfile +import unittest + +# external imports +from shep.store.file import SimpleFileStoreFactory +from chainlib.chain import ChainSpec + +# local imports +from chainqueue import ( + Store, + Status, + ) + +# test imports +from tests.common import ( + MockCounter, + MockTokenCache + ) + + +class MockContentStore: + + def __init__(self): + self.store = {} + + + def put(self, k, v): + self.store[k] = v + + + def get(self, k): + return self.store.get(k) + + +class TestShepBase(unittest.TestCase): + + def setUp(self): + self.path = tempfile.mkdtemp() + factory = SimpleFileStoreFactory(self.path).add + self.state = Status(factory) + content_store = MockContentStore() + counter = MockCounter() + chain_spec = ChainSpec('foo', 'bar', 42, 'baz') + self.cache = MockTokenCache() + self.store = Store(chain_spec, self.state, content_store, counter, cache=self.cache) + + + def test_basic(self): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_shep.py b/tests/test_shep.py @@ -49,7 +49,11 @@ class TestShep(TestShepBase): self.state.set('foo', self.state.FINAL) with self.assertRaises(StateTransitionInvalid): self.state.move('foo', self.state.INSUFFICIENT_FUNDS) - + + + def test_shep_cache(self): + self.store.put('foo', 'bar') + if __name__ == '__main__': unittest.main()