chainqueue

Blockchain transaction queue control
Info | Log | Files | Refs | LICENSE

test_integrate.py (3492B)


      1 # standard imports
      2 import os
      3 import tempfile
      4 import unittest
      5 import logging
      6 import time
      7 
      8 # external imports
      9 from shep.store.file import SimpleFileStoreFactory
     10 from chainlib.chain import ChainSpec
     11 
     12 # local imports
     13 from chainqueue import (
     14         Store,
     15         Status,
     16         )
     17 
     18 # test imports
     19 from tests.common import (
     20         MockCounter,
     21         MockTokenCache,
     22         MockCacheTokenTx,
     23         MockContentStore,
     24         )
     25 from tests.base_shep import TestShepBase
     26 
     27 logging.basicConfig(level=logging.DEBUG)
     28 logg = logging.getLogger()
     29 
     30 
     31 class TestIntegrateBase(TestShepBase):
     32 
     33     def setUp(self):
     34         self.path = tempfile.mkdtemp()
     35         factory = SimpleFileStoreFactory(self.path).add
     36         self.state = Status(factory)
     37         content_store = MockContentStore()
     38         counter = MockCounter()
     39         chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
     40         self.cache = MockTokenCache()
     41         self.store = Store(chain_spec, self.state, content_store, counter, cache=self.cache)
     42 
     43 
     44     def test_integration_valid(self):
     45         self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
     46 
     47     
     48     def test_state_default(self):
     49         (s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
     50         v = self.store.pending()
     51         self.assertEqual(len(v), 1)
     52         self.assertEqual(v[0], hx)
     53 
     54 
     55     def test_state_enqueue(self):
     56         (s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
     57         self.store.get(hx)
     58         self.store.enqueue(hx)
     59         v = self.store.upcoming()
     60         self.assertEqual(len(v), 1)
     61         v = self.store.pending()
     62         self.assertEqual(len(v), 0)
     63 
     64 
     65     def test_state_defer(self):
     66         (s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
     67         self.store.reserve(hx)
     68         self.store.fail(hx)
     69         v = self.store.deferred()
     70         self.assertEqual(len(v), 1)
     71         self.assertEqual(v[0], hx)
     72 
     73 
     74     def test_state_multiple(self):
     75         (s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
     76         self.store.reserve(hx)
     77         self.store.fail(hx)
     78       
     79         (s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
     80         self.store.reserve(hx)
     81         self.store.fail(hx)
     82         v = self.store.deferred()
     83         self.assertEqual(len(v), 2)
     84 
     85 
     86     def test_state_multiple_sort(self):
     87         (s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
     88         self.store.reserve(hx)
     89         self.store.fail(hx)
     90        
     91         (s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
     92         self.store.enqueue(hx)
     93         
     94         (s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
     95         self.store.reserve(hx)
     96         self.store.fail(hx)
     97 
     98         self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
     99         v = self.store.deferred()
    100         self.assertEqual(len(v), 2)
    101 
    102 
    103     def test_state_date_threshold(self):
    104         (s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
    105         self.store.reserve(hx)
    106         self.store.fail(hx)
    107         then = self.store.modified(s)
    108         time.sleep(0.1)
    109 
    110         (s, hx) = self.store.put(os.urandom(8).hex(), cache_adapter=MockCacheTokenTx)
    111         self.store.reserve(hx)
    112         self.store.fail(hx)
    113 
    114         v = self.store.deferred(threshold=then)
    115         self.assertEqual(len(v), 1)
    116 
    117 
    118 if __name__ == '__main__':
    119     unittest.main()