chainqueue

Blockchain transaction queue control
Info | Log | Files | Refs | LICENSE

test_fs.py (2397B)


      1 # standard imports
      2 import unittest
      3 import tempfile
      4 import shutil
      5 import logging
      6 import os
      7 
      8 # external imports
      9 from leveldir.hex import HexDir
     10 
     11 # local imports
     12 from chainqueue.fs.queue import FsQueue
     13 from chainqueue.enum import StatusBits
     14 
     15 logging.basicConfig(level=logging.DEBUG)
     16 logg = logging.getLogger()
     17 
     18 
     19 class FsQueueTest(unittest.TestCase):
     20 
     21     def setUp(self):
     22         self.dir = tempfile.mkdtemp() 
     23         self.hexdir = HexDir(os.path.join(self.dir, 'q'), 32, 2, 8)
     24         self.q = FsQueue(os.path.join(self.dir, 'spool'), backend=self.hexdir)
     25         logg.debug('setup fsqueue root {}'.format(self.dir))
     26 
     27 
     28     def tearDown(self):
     29         shutil.rmtree(self.dir)
     30         logg.debug('cleaned fsqueue root {}'.format(self.dir))
     31 
     32 
     33     def test_new(self):
     34         tx_hash = os.urandom(32)
     35         tx_content = os.urandom(128)
     36         self.q.add(tx_hash, tx_content)
     37 
     38         f = open(os.path.join(self.q.path_state['new'], tx_hash.hex()), 'rb')
     39         r = f.read()
     40         f.close()
     41         self.assertEqual(r, b'\x00' * 8)
     42 
     43     
     44     def test_change(self):
     45         tx_hash = os.urandom(32)
     46         tx_content = os.urandom(128)
     47         self.q.add(tx_hash, tx_content)
     48         self.q.set(tx_hash, StatusBits.QUEUED)
     49         
     50         (tx_status, tx_content_retrieved) = self.q.get(tx_hash)
     51         status = int.from_bytes(tx_status, byteorder='big')
     52         self.assertEqual(status & StatusBits.QUEUED, StatusBits.QUEUED)
     53 
     54 
     55     def test_move(self):
     56         tx_hash = os.urandom(32)
     57         tx_content = os.urandom(128)
     58         self.q.add(tx_hash, tx_content)
     59         self.q.move(tx_hash, 'ready', from_state='new')
     60        
     61         f = open(os.path.join(self.q.path_state['ready'], tx_hash.hex()), 'rb')
     62         r = f.read()
     63         f.close()
     64         self.assertEqual(r, b'\x00' * 8)
     65 
     66 
     67     def test_purge(self):
     68         tx_hash = os.urandom(32)
     69         tx_content = os.urandom(128)
     70         self.q.add(tx_hash, tx_content)
     71         self.q.move(tx_hash, 'ready', from_state='new')
     72         self.q.purge(tx_hash, 'ready')
     73       
     74         with self.assertRaises(FileNotFoundError):
     75             entry_path = os.path.join(self.q.path_state['ready'], tx_hash.hex())
     76             os.stat(entry_path)
     77 
     78         with self.assertRaises(FileNotFoundError):
     79             entry_path = os.path.join(self.q.index_path, tx_hash.hex())
     80             os.stat(entry_path)
     81 
     82 
     83 if __name__ == '__main__':
     84     unittest.main()