chaind

Base package for chain queue serviceBase package for chain queue service
Info | Log | Files | Refs | LICENSE

test_fs.py (2721B)


      1 # standard imports
      2 import os
      3 import unittest
      4 import shutil
      5 import logging
      6 
      7 # external imports
      8 from chainlib.status import Status as TxStatus
      9 
     10 # local imports
     11 from chaind.driver import QueueDriver
     12 from chaind.filter import StateFilter
     13 
     14 # test imports
     15 from chaind.unittest.common import (
     16     MockTx,
     17     MockBlock,
     18     MockCacheAdapter,
     19     MockDispatcher,
     20     )
     21 from chaind.unittest.fs import TestChaindFsBase
     22 
     23 logging.basicConfig(level=logging.DEBUG)
     24 logg = logging.getLogger()
     25 
     26 
     27 class TestChaindFs(TestChaindFsBase):
     28 
     29     def setUp(self):
     30         self.cache_adapter = MockCacheAdapter
     31         self.dispatcher = MockDispatcher()
     32         super(TestChaindFs, self).setUp()
     33 
     34 
     35     def tearDown(self):
     36         shutil.rmtree(self.path)
     37 
     38 
     39     def test_fs_setup(self):
     40         data = os.urandom(128).hex()
     41         hsh = self.adapter.put(data)
     42         v = self.adapter.get(hsh)
     43         self.assertEqual(data, v)
     44 
     45 
     46     def test_fs_fail(self):
     47         data = os.urandom(128).hex()
     48         hsh = self.adapter.put(data)
     49         self.dispatcher.add_fail(data)
     50 
     51         r = self.adapter.dispatch(hsh)
     52         self.assertFalse(r)
     53 
     54         txs = self.adapter.failed()
     55         self.assertEqual(len(txs), 1)
     56 
     57 
     58     def test_fs_process(self):
     59         drv = QueueDriver(self.adapter)
     60 
     61         data = os.urandom(128).hex()
     62         hsh = self.adapter.put(data)
     63 
     64         txs = self.adapter.upcoming()
     65         self.assertEqual(len(txs), 0)
     66 
     67         drv.process()
     68         txs = self.adapter.upcoming()
     69         self.assertEqual(len(txs), 1)
     70 
     71 
     72     def test_fs_filter(self):
     73         drv = QueueDriver(self.adapter)
     74 
     75         data = os.urandom(128).hex()
     76         hsh = self.adapter.put(data)
     77         
     78         fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
     79         tx = MockTx(hsh)
     80         block = MockBlock(42)
     81         fltr.filter(None, block, tx)
     82 
     83 
     84     def test_fs_filter_fail(self):
     85         drv = QueueDriver(self.adapter)
     86 
     87         data = os.urandom(128).hex()
     88         hsh = self.adapter.put(data)
     89         
     90         fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
     91         tx = MockTx(hsh, TxStatus.ERROR)
     92         block = MockBlock(42)
     93         fltr.filter(None, block, tx)
     94 
     95 
     96     def test_upcoming(self):
     97         drv = QueueDriver(self.adapter)
     98 
     99         txs = []
    100         for i in range(10):
    101             data = os.urandom(128).hex()
    102             hsh = self.adapter.put(data)
    103             txs.append(hsh)
    104             self.adapter.enqueue(hsh)
    105 
    106         r = self.adapter.upcoming(limit=5)
    107         self.assertEqual(len(r), 5)
    108 
    109         r = self.adapter.dispatch(txs[0])
    110         self.assertTrue(r)
    111 
    112         r = self.adapter.upcoming(limit=5)
    113         self.assertEqual(len(r), 4)
    114 
    115 
    116 if __name__ == '__main__':
    117     unittest.main()