chainqueue

Blockchain transaction queue control
Info | Log | Files | Refs | LICENSE

test_entry.py (2381B)


      1 # standard imports
      2 import os
      3 import logging
      4 import unittest
      5 
      6 # external imports
      7 from hexathon import add_0x
      8 from chainlib.tx import Tx
      9 from chainlib.block import Block
     10 
     11 # local imports
     12 from chainqueue import QueueEntry
     13 
     14 # test imports
     15 from tests.base_shep import TestShepBase
     16 from tests.common import MockCacheTokenTx
     17 
     18 logging.basicConfig(level=logging.DEBUG)
     19 logg = logging.getLogger()
     20 
     21 
     22 class TestEntry(TestShepBase):
     23     
     24     def test_entry_get(self):
     25         signed_tx = add_0x(os.urandom(128).hex())
     26         nonce = 42
     27         entry = QueueEntry(self.store, cache_adapter=MockCacheTokenTx)
     28         tx_hash_one = entry.create(signed_tx)
     29 
     30         signed_tx = add_0x(os.urandom(128).hex())
     31         nonce = 42
     32         entry = QueueEntry(self.store, cache_adapter=MockCacheTokenTx)
     33         tx_hash_two = entry.create(signed_tx)
     34 
     35         txs = self.store.by_state(include_pending=True)
     36         self.assertEqual(len(txs), 2)
     37      
     38         logg.debug('tx hash one {}'.format(tx_hash_one))
     39         entry = QueueEntry(self.store, tx_hash=tx_hash_one, cache_adapter=MockCacheTokenTx)
     40         entry.load()
     41         entry.sent()
     42         
     43         txs = self.store.by_state(include_pending=True)
     44         self.assertEqual(len(txs), 1)
     45 
     46         txs = self.store.by_state(state=self.store.IN_NETWORK)
     47         self.assertEqual(len(txs), 1)
     48 
     49         entry.succeed(None, None)
     50         txs = self.store.by_state(include_pending=True)
     51         self.assertEqual(len(txs), 1)
     52       
     53         entry = QueueEntry(self.store, tx_hash_two)
     54         entry.load()
     55         entry.sent()
     56         
     57         txs = self.store.by_state(state=self.store.IN_NETWORK)
     58         self.assertEqual(len(txs), 2)
     59 
     60         txs = self.store.by_state(state=self.store.IN_NETWORK, strict=True)
     61         self.assertEqual(len(txs), 1)
     62 
     63 
     64     def test_entry_change(self):
     65         signed_tx = add_0x(os.urandom(128).hex())
     66         nonce = 42
     67         entry = QueueEntry(self.store, cache_adapter=MockCacheTokenTx)
     68         tx_hash = entry.create(signed_tx)
     69 
     70         block = Block()
     71         block.number = 13
     72         tx = Tx(None)
     73         tx.index = 666
     74 
     75         entry.readysend()
     76         entry.reserve()
     77         entry.sendfail()
     78 
     79         entry = QueueEntry(self.store, tx_hash, cache_adapter=MockCacheTokenTx)
     80         entry.load()
     81         self.assertEqual(str(entry.tx_hash), tx_hash)
     82        
     83 
     84 if __name__ == '__main__':
     85     unittest.main()