chainqueue

Blockchain transaction queue control
Info | Log | Files | Refs | LICENSE

test_store.py (2625B)


      1 # standard imports
      2 import os
      3 import tempfile
      4 import unittest
      5 import logging
      6 import shutil
      7 
      8 # external imports
      9 from chainlib.chain import ChainSpec
     10 from shep.store.noop import NoopStoreFactory
     11 
     12 # local imports
     13 from chainqueue.store.fs import (
     14         IndexStore,
     15         CounterStore,
     16         )
     17 from chainqueue.store.base import Store
     18 from chainqueue.error import DuplicateTxError
     19 from chainqueue.state import Status
     20 
     21 # tests imports
     22 from tests.common import (
     23         MockTokenCache,
     24         MockCacheTokenTx,
     25         )
     26 
     27 logging.basicConfig(level=logging.DEBUG)
     28 logg = logging.getLogger()
     29 
     30 class TestStoreImplementations(unittest.TestCase):
     31 
     32     def setUp(self):
     33         self.path = tempfile.mkdtemp()
     34       
     35 
     36     def tearDown(self):
     37         shutil.rmtree(self.path)
     38 
     39     
     40     def test_basic_index(self):
     41         store = IndexStore(self.path)
     42         hx = os.urandom(32).hex()
     43         data = 'foo_bar_baz'
     44         store.put(hx, data)
     45         r = store.get(hx)
     46         self.assertEqual(data, r)
     47 
     48 
     49     def test_basic_counter(self):
     50         store = CounterStore(self.path)
     51         v = store.next()
     52         self.assertEqual(v, 0)
     53         v = store.next()
     54         self.assertEqual(v, 1)
     55 
     56         store = CounterStore(self.path)
     57         v = store.next()
     58         self.assertEqual(v, 2)
     59 
     60 
     61     def test_duplicate(self):
     62         store = IndexStore(self.path)
     63         hx = os.urandom(32).hex()
     64         data = 'foo_bar_baz'
     65         store.put(hx, data)
     66         with self.assertRaises(DuplicateTxError):
     67             store.put(hx, data)
     68 
     69 
     70     def test_upcoming_limit(self):
     71         index_store = IndexStore(self.path)
     72         counter_store = CounterStore(self.path)
     73         chain_spec = ChainSpec('foo', 'bar', 42, 'baz') 
     74         factory = NoopStoreFactory().add
     75         state_store = Status(factory)
     76         cache_store = MockTokenCache()
     77         queue_store = Store(chain_spec, state_store, index_store, counter_store, cache=cache_store)
     78 
     79         txs = []
     80         for i in range(3):
     81             tx_src = os.urandom(128).hex()
     82             tx = queue_store.put(tx_src, cache_adapter=MockCacheTokenTx)
     83             txs.append(tx)
     84 
     85         r = queue_store.upcoming(limit=3)
     86         self.assertEqual(len(r), 0)
     87 
     88         for tx in txs:
     89             queue_store.enqueue(tx[1])
     90 
     91         r = queue_store.upcoming(limit=3)
     92         self.assertEqual(len(r), 3)
     93 
     94         queue_store.send_start(txs[0][1])
     95         r = queue_store.upcoming(limit=3)
     96         self.assertEqual(len(r), 2)
     97 
     98         queue_store.send_end(txs[0][1])
     99         r = queue_store.upcoming(limit=3)
    100         self.assertEqual(len(r), 2)
    101 
    102 
    103 if __name__ == '__main__':
    104     unittest.main()