test_store.py (2625B)
1 # standard imports 2 import os 3 import tempfile 4 import unittest 5 import logging 6 import shutil 7 8 # external imports 9 from chainlib.chain import ChainSpec 10 from shep.store.noop import NoopStoreFactory 11 12 # local imports 13 from chainqueue.store.fs import ( 14 IndexStore, 15 CounterStore, 16 ) 17 from chainqueue.store.base import Store 18 from chainqueue.error import DuplicateTxError 19 from chainqueue.state import Status 20 21 # tests imports 22 from tests.common import ( 23 MockTokenCache, 24 MockCacheTokenTx, 25 ) 26 27 logging.basicConfig(level=logging.DEBUG) 28 logg = logging.getLogger() 29 30 class TestStoreImplementations(unittest.TestCase): 31 32 def setUp(self): 33 self.path = tempfile.mkdtemp() 34 35 36 def tearDown(self): 37 shutil.rmtree(self.path) 38 39 40 def test_basic_index(self): 41 store = IndexStore(self.path) 42 hx = os.urandom(32).hex() 43 data = 'foo_bar_baz' 44 store.put(hx, data) 45 r = store.get(hx) 46 self.assertEqual(data, r) 47 48 49 def test_basic_counter(self): 50 store = CounterStore(self.path) 51 v = store.next() 52 self.assertEqual(v, 0) 53 v = store.next() 54 self.assertEqual(v, 1) 55 56 store = CounterStore(self.path) 57 v = store.next() 58 self.assertEqual(v, 2) 59 60 61 def test_duplicate(self): 62 store = IndexStore(self.path) 63 hx = os.urandom(32).hex() 64 data = 'foo_bar_baz' 65 store.put(hx, data) 66 with self.assertRaises(DuplicateTxError): 67 store.put(hx, data) 68 69 70 def test_upcoming_limit(self): 71 index_store = IndexStore(self.path) 72 counter_store = CounterStore(self.path) 73 chain_spec = ChainSpec('foo', 'bar', 42, 'baz') 74 factory = NoopStoreFactory().add 75 state_store = Status(factory) 76 cache_store = MockTokenCache() 77 queue_store = Store(chain_spec, state_store, index_store, counter_store, cache=cache_store) 78 79 txs = [] 80 for i in range(3): 81 tx_src = os.urandom(128).hex() 82 tx = queue_store.put(tx_src, cache_adapter=MockCacheTokenTx) 83 txs.append(tx) 84 85 r = queue_store.upcoming(limit=3) 86 self.assertEqual(len(r), 0) 87 88 for tx in txs: 89 queue_store.enqueue(tx[1]) 90 91 r = queue_store.upcoming(limit=3) 92 self.assertEqual(len(r), 3) 93 94 queue_store.send_start(txs[0][1]) 95 r = queue_store.upcoming(limit=3) 96 self.assertEqual(len(r), 2) 97 98 queue_store.send_end(txs[0][1]) 99 r = queue_store.upcoming(limit=3) 100 self.assertEqual(len(r), 2) 101 102 103 if __name__ == '__main__': 104 unittest.main()