chainqueue

Blockchain transaction queue control
Log | Files | Refs | LICENSE

commit fb14961c1b7f1fbfcfda774a61d7be762ccb7163
parent 5ea1b15c53c0d8f86a83fba20613385f87ed7e02
Author: nolash <dev@holbrook.no>
Date:   Fri,  2 Apr 2021 14:45:05 +0200

Add nonce list test

Diffstat:
Mchainqueue/error.py | 7+++++++
Mchainqueue/query.py | 78+++++++++++++++++++++++++++++++++++++++++++++---------------------------------
Mchainqueue/state.py | 3---
Mtests/base.py | 3++-
Mtests/test_tx_cache.py | 6++++++
5 files changed, 60 insertions(+), 37 deletions(-)

diff --git a/chainqueue/error.py b/chainqueue/error.py @@ -14,3 +14,10 @@ class TxStateChangeError(ChainQueueException): """Raised when an invalid state change of a queued transaction occurs """ pass + + +class CacheIntegrityError(ChainQueueException): + """Raised when cached data does not match raw transaction data + """ + pass + diff --git a/chainqueue/query.py b/chainqueue/query.py @@ -8,8 +8,12 @@ from sqlalchemy import or_ from sqlalchemy import not_ from sqlalchemy import tuple_ from sqlalchemy import func +from hexathon import add_0x # local imports +from chainqueue.db.models.otx import Otx +from chainqueue.db.models.tx import TxCache +from chainqueue.db.models.base import SessionBase from chainqueue.db.enum import status_str from chainqueue.db.enum import ( StatusEnum, @@ -17,11 +21,14 @@ from chainqueue.db.enum import ( is_alive, dead, ) +from chainqueue.error import ( + NotLocalTxError, + ) logg = logging.getLogger().getChild(__name__) -def get_tx_cache(tx_hash): +def get_tx_cache(chain_spec, tx_hash): """Returns an aggregate dictionary of outgoing transaction data and metadata :param tx_hash: Transaction hash of record to modify @@ -31,10 +38,8 @@ def get_tx_cache(tx_hash): :rtype: dict """ session = SessionBase.create_session() - q = session.query(Otx) - q = q.filter(Otx.tx_hash==tx_hash) - otx = q.first() + otx = Otx.load(tx_hash, session=session) if otx == None: session.close() raise NotLocalTxError(tx_hash) @@ -47,18 +52,19 @@ def get_tx_cache(tx_hash): session.close() + # TODO: DRY, get_tx_cache / get_tx tx = { - 'tx_hash': otx.tx_hash, - 'signed_tx': otx.signed_tx, + 'tx_hash': add_0x(otx.tx_hash), + 'signed_tx': add_0x(otx.signed_tx), 'nonce': otx.nonce, 'status': status_str(otx.status), 'status_code': otx.status, - 'source_token': txc.source_token_address, - 'destination_token': txc.destination_token_address, - 'block_number': txc.block_number, + 'source_token': add_0x(txc.source_token_address), + 'destination_token': add_0x(txc.destination_token_address), + 'block_number': otx.block, 'tx_index': txc.tx_index, - 'sender': txc.sender, - 'recipient': txc.recipient, + 'sender': add_0x(txc.sender), + 'recipient': add_0x(txc.recipient), 'from_value': int(txc.from_value), 'to_value': int(txc.to_value), 'date_created': txc.date_created, @@ -69,8 +75,7 @@ def get_tx_cache(tx_hash): return tx -@celery_app.task(base=CriticalSQLAlchemyTask) -def get_tx(tx_hash): +def get_tx(chain_spec, tx_hash): """Retrieve a transaction queue record by transaction hash :param tx_hash: Transaction hash of record to modify @@ -80,25 +85,23 @@ def get_tx(tx_hash): :rtype: dict """ session = SessionBase.create_session() - q = session.query(Otx) - q = q.filter(Otx.tx_hash==tx_hash) - tx = q.first() - if tx == None: + otx = Otx.load(tx_hash, session=session) + if otx == None: session.close() raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) o = { - 'otx_id': tx.id, - 'nonce': tx.nonce, - 'signed_tx': tx.signed_tx, - 'status': tx.status, + 'otx_id': otx.id, + 'nonce': otx.nonce, + 'signed_tx': otx.signed_tx, + 'status': otx.status, } logg.debug('get tx {}'.format(o)) session.close() return o -def get_nonce_tx(nonce, sender, chain_spec): +def get_nonce_tx_cache(chain_spec, nonce, sender, decoder=None): """Retrieve all transactions for address with specified nonce :param nonce: Nonce @@ -108,6 +111,8 @@ def get_nonce_tx(nonce, sender, chain_spec): :returns: Transactions :rtype: dict, with transaction hash as key, signed raw transaction as value """ + chain_id = chain_spec.chain_id() + session = SessionBase.create_session() q = session.query(Otx) q = q.join(TxCache) @@ -116,18 +121,19 @@ def get_nonce_tx(nonce, sender, chain_spec): txs = {} for r in q.all(): - tx_signed_bytes = bytes.fromhex(r.signed_tx[2:]) - tx = unpack(tx_signed_bytes, chain_id) - if sender == None or tx['from'] == sender: - txs[r.tx_hash] = r.signed_tx + tx_signed_bytes = bytes.fromhex(r.signed_tx) + if decoder != None: + tx = unpack(tx_signed_bytes, chain_id) + if sender != None and tx['from'] != sender: + raise IntegrityError('Cache sender {} does not match sender in tx {} using decoder {}'.format(sender, r.tx_hash, str(decoder))) + txs[r.tx_hash] = r.signed_tx session.close() return txs - -def get_paused_txs(chain_spec, status=None, sender=None, session=None): +def get_paused_tx_cache(chain_spec, status=None, sender=None, session=None, decoder=None): """Returns not finalized transactions that have been attempted sent without success. :param status: If set, will return transactions with this local queue status only @@ -157,13 +163,19 @@ def get_paused_txs(chain_spec, status=None, sender=None, session=None): q = q.filter(TxCache.sender==sender) txs = {} + gas = 0 for r in q.all(): - tx_signed_bytes = bytes.fromhex(r.signed_tx[2:]) - tx = unpack(tx_signed_bytes, chain_id) - if sender == None or tx['from'] == sender: - #gas += tx['gas'] * tx['gasPrice'] - txs[r.tx_hash] = r.signed_tx + tx_signed_bytes = bytes.fromhex(r.signed_tx) + if decoder != None: + tx = unpack(tx_signed_bytes, chain_id) + if sender != None and tx['from'] != sender: + raise IntegrityError('Cache sender {} does not match sender in tx {} using decoder {}'.format(sender, r.tx_hash, str(decoder))) + gas += tx['gas'] * tx['gasPrice'] + + #tx = unpack(tx_signed_bytes, chain_id) + #if sender == None or tx['from'] == sender: + txs[r.tx_hash] = r.signed_tx SessionBase.release_session(session) diff --git a/chainqueue/state.py b/chainqueue/state.py @@ -101,9 +101,6 @@ def set_final(tx_hash, block=None, fail=False): q = q.filter(Otx.tx_hash==strip_0x(tx_hash)) o = q.first() - if o != None: - - session.close() return tx_hash diff --git a/tests/base.py b/tests/base.py @@ -20,6 +20,7 @@ from chainqueue.tx import create script_dir = os.path.realpath(os.path.dirname(__file__)) +logging.basicConfig(level=logging.WARNING) logg = logging.getLogger().getChild(__name__) @@ -44,7 +45,7 @@ class TestBase(unittest.TestCase): SessionBase.poolable = False SessionBase.transactional = False SessionBase.procedural = False - SessionBase.connect(dsn, debug=False) + SessionBase.connect(dsn, debug=True) ac = alembic.config.Config(os.path.join(migrationsdir, 'alembic.ini')) ac.set_main_option('sqlalchemy.url', dsn) diff --git a/tests/test_tx_cache.py b/tests/test_tx_cache.py @@ -5,6 +5,7 @@ import unittest from chainqueue.db.models.tx import TxCache from chainqueue.error import NotLocalTxError from chainqueue.state import * +from chainqueue.query import get_tx_cache # test imports from tests.base import TestTxBase @@ -31,5 +32,10 @@ class TestTxCache(TestTxBase): self.assertEqual(txc.tx_index, 13) + def test_get(self): + tx_extended_dict = get_tx_cache(self.chain_spec, self.tx_hash) + self.assertEqual(tx_extended_dict['tx_hash'], self.tx_hash) + + if __name__ == '__main__': unittest.main()