chainqueue

Blockchain transaction queue control
Log | Files | Refs | LICENSE

commit dd11937d6b83560a35911c3ade9d619391cb3f88
parent b50145082713e6af6da8c09d855c2ca5aa5495f1
Author: nolash <dev@holbrook.no>
Date:   Fri,  2 Apr 2021 11:51:00 +0200

Add API modules

Diffstat:
Mchainqueue/db/enum.py | 18++----------------
Mchainqueue/db/error.py | 8--------
Mchainqueue/db/models/otx.py | 30+++++++++---------------------
Achainqueue/db/models/state.py | 24++++++++++++++++++++++++
Mchainqueue/db/models/tx.py | 6++++--
Mchainqueue/error.py | 12+++++++++++-
Achainqueue/query.py | 333+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Achainqueue/state.py | 335+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Achainqueue/tx.py | 67+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Atests/test_otx.py | 63+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
10 files changed, 848 insertions(+), 48 deletions(-)

diff --git a/chainqueue/db/enum.py b/chainqueue/db/enum.py @@ -8,6 +8,7 @@ class StatusBits(enum.IntEnum): """ QUEUED = 0x01 # transaction should be sent to network + RESERVED = 0x02 # transaction is currently being handled by a thread IN_NETWORK = 0x08 # transaction is in network DEFERRED = 0x10 # an attempt to send the transaction to network has failed @@ -65,22 +66,6 @@ class StatusEnum(enum.IntEnum): SUCCESS = StatusBits.IN_NETWORK | StatusBits.FINAL -@enum.unique -class LockEnum(enum.IntEnum): - """ - STICKY: When set, reset is not possible - CREATE: Disable creation of accounts - SEND: Disable sending to network - QUEUE: Disable queueing new or modified transactions - """ - STICKY=1 - CREATE=2 - SEND=4 - QUEUE=8 - QUERY=16 - ALL=int(0xfffffffffffffffe) - - def status_str(v, bits_only=False): """Render a human-readable string describing the status @@ -118,6 +103,7 @@ def status_str(v, bits_only=False): return s + def all_errors(): """Bit mask of all error states diff --git a/chainqueue/db/error.py b/chainqueue/db/error.py @@ -1,9 +1 @@ -class TxStateChangeError(Exception): - """Raised when an invalid state change of a queued transaction occurs - """ - pass - -class UnknownConvertError(Exception): - """Raised when a non-existent convert to transaction subtask is requested - """ diff --git a/chainqueue/db/models/otx.py b/chainqueue/db/models/otx.py @@ -11,31 +11,18 @@ from hexathon import ( # local imports from .base import SessionBase +from .state import OtxStateLog from chainqueue.db.enum import ( StatusEnum, StatusBits, status_str, is_error_status, ) -from chainqueue.db.error import TxStateChangeError +from chainqueue.error import TxStateChangeError logg = logging.getLogger().getChild(__name__) -class OtxStateLog(SessionBase): - - __tablename__ = 'otx_state_log' - - date = Column(DateTime, default=datetime.datetime.utcnow) - status = Column(Integer) - otx_id = Column(Integer, ForeignKey('otx.id')) - - - def __init__(self, otx): - self.otx_id = otx.id - self.status = otx.status - - class Otx(SessionBase): """Outgoing transactions with local origin. @@ -308,7 +295,7 @@ class Otx(SessionBase): raise TxStateChangeError('SENT cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) self.__set_status(StatusBits.IN_NETWORK, session) - self.__reset_status(StatusBits.DEFERRED | StatusBits.QUEUED | StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR, session) + self.__reset_status(StatusBits.RESERVED | StatusBits.DEFERRED | StatusBits.QUEUED | StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR, session) if self.tracing: self.__state_log(session=session) @@ -336,7 +323,7 @@ class Otx(SessionBase): raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status))) self.__set_status(StatusBits.LOCAL_ERROR | StatusBits.DEFERRED, session) - self.__reset_status(StatusBits.QUEUED | StatusBits.GAS_ISSUES, session) + self.__reset_status(StatusBits.RESERVED | StatusBits.QUEUED | StatusBits.GAS_ISSUES, session) if self.tracing: self.__state_log(session=session) @@ -344,7 +331,7 @@ class Otx(SessionBase): SessionBase.release_session(session) - def dequeue(self, session=None): + def reserve(self, session=None): """Marks that a process to execute send attempt is underway Only manipulates object, does not transaction or commit to backend. @@ -364,6 +351,7 @@ class Otx(SessionBase): raise TxStateChangeError('QUEUED cannot be unset on an entry with IN_NETWORK state set ({})'.format(status_str(self.status))) self.__reset_status(StatusBits.QUEUED, session) + self.__set_status(StatusBits.RESERVED, session) if self.tracing: self.__state_log(session=session) @@ -505,7 +493,7 @@ class Otx(SessionBase): session = SessionBase.bind_session(session) q = session.query(Otx) - q = q.filter(Otx.tx_hash==tx_hash) + q = q.filter(Otx.tx_hash==strip_0x(tx_hash)) SessionBase.release_session(session) @@ -537,12 +525,12 @@ class Otx(SessionBase): # TODO: it is not safe to return otx here unless session has been passed in @staticmethod - def add(nonce, address, tx_hash, signed_tx, session=None): + def add(nonce, tx_hash, signed_tx, session=None): external_session = session != None session = SessionBase.bind_session(session) - otx = Otx(nonce, address, tx_hash, signed_tx) + otx = Otx(nonce, tx_hash, signed_tx) session.add(otx) session.flush() if otx.tracing: diff --git a/chainqueue/db/models/state.py b/chainqueue/db/models/state.py @@ -0,0 +1,24 @@ +# standard imports +import datetime + +# external imports +from sqlalchemy import Column, Integer, DateTime, ForeignKey + +# local imports +from .base import SessionBase + + +class OtxStateLog(SessionBase): + + __tablename__ = 'otx_state_log' + + date = Column(DateTime, default=datetime.datetime.utcnow) + status = Column(Integer) + otx_id = Column(Integer, ForeignKey('otx.id')) + + + def __init__(self, otx): + self.otx_id = otx.id + self.status = otx.status + + diff --git a/chainqueue/db/models/tx.py b/chainqueue/db/models/tx.py @@ -11,8 +11,10 @@ from hexathon import ( # local imports from .base import SessionBase from .otx import Otx -from chainqueue.error import NotLocalTxError -from chainqueue.db.error import TxStateChangeError +from chainqueue.error import ( + NotLocalTxError, + TxStateChangeError, + ) class TxCache(SessionBase): diff --git a/chainqueue/error.py b/chainqueue/error.py @@ -1,4 +1,14 @@ -class NotLocalTxError(Exception): +class ChainQueueException(Exception): + pass + + +class NotLocalTxError(ChainQueueException): """Exception raised when trying to access a tx not originated from a local task """ pass + + +class TxStateChangeError(ChainQueueException): + """Raised when an invalid state change of a queued transaction occurs + """ + pass diff --git a/chainqueue/query.py b/chainqueue/query.py @@ -0,0 +1,333 @@ +# standard imports +import logging +import time +import datetime + +# external imports +from sqlalchemy import or_ +from sqlalchemy import not_ +from sqlalchemy import tuple_ +from sqlalchemy import func + +# local imports +from chainqueue.db.enum import status_str +from chainqueue.db.enum import ( + StatusEnum, + StatusBits, + is_alive, + dead, + ) + +logg = logging.getLogger().getChild(__name__) + + +def get_tx_cache(tx_hash): + """Returns an aggregate dictionary of outgoing transaction data and metadata + + :param tx_hash: Transaction hash of record to modify + :type tx_hash: str, 0x-hex + :raises NotLocalTxError: If transaction not found in queue. + :returns: Transaction data + :rtype: dict + """ + session = SessionBase.create_session() + q = session.query(Otx) + q = q.filter(Otx.tx_hash==tx_hash) + otx = q.first() + + if otx == None: + session.close() + raise NotLocalTxError(tx_hash) + + session.flush() + + q = session.query(TxCache) + q = q.filter(TxCache.otx_id==otx.id) + txc = q.first() + + session.close() + + tx = { + 'tx_hash': otx.tx_hash, + 'signed_tx': otx.signed_tx, + 'nonce': otx.nonce, + 'status': status_str(otx.status), + 'status_code': otx.status, + 'source_token': txc.source_token_address, + 'destination_token': txc.destination_token_address, + 'block_number': txc.block_number, + 'tx_index': txc.tx_index, + 'sender': txc.sender, + 'recipient': txc.recipient, + 'from_value': int(txc.from_value), + 'to_value': int(txc.to_value), + 'date_created': txc.date_created, + 'date_updated': txc.date_updated, + 'date_checked': txc.date_checked, + } + + return tx + + +@celery_app.task(base=CriticalSQLAlchemyTask) +def get_tx(tx_hash): + """Retrieve a transaction queue record by transaction hash + + :param tx_hash: Transaction hash of record to modify + :type tx_hash: str, 0x-hex + :raises NotLocalTxError: If transaction not found in queue. + :returns: nonce, address and signed_tx (raw signed transaction) + :rtype: dict + """ + session = SessionBase.create_session() + q = session.query(Otx) + q = q.filter(Otx.tx_hash==tx_hash) + tx = q.first() + if tx == None: + session.close() + raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) + + o = { + 'otx_id': tx.id, + 'nonce': tx.nonce, + 'signed_tx': tx.signed_tx, + 'status': tx.status, + } + logg.debug('get tx {}'.format(o)) + session.close() + return o + + +def get_nonce_tx(nonce, sender, chain_spec): + """Retrieve all transactions for address with specified nonce + + :param nonce: Nonce + :type nonce: number + :param address: Ethereum address + :type address: str, 0x-hex + :returns: Transactions + :rtype: dict, with transaction hash as key, signed raw transaction as value + """ + session = SessionBase.create_session() + q = session.query(Otx) + q = q.join(TxCache) + q = q.filter(TxCache.sender==sender) + q = q.filter(Otx.nonce==nonce) + + txs = {} + for r in q.all(): + tx_signed_bytes = bytes.fromhex(r.signed_tx[2:]) + tx = unpack(tx_signed_bytes, chain_id) + if sender == None or tx['from'] == sender: + txs[r.tx_hash] = r.signed_tx + + session.close() + + return txs + + + +def get_paused_txs(chain_spec, status=None, sender=None, session=None): + """Returns not finalized transactions that have been attempted sent without success. + + :param status: If set, will return transactions with this local queue status only + :type status: cic_eth.db.enum.StatusEnum + :param recipient: Recipient address to return transactions for + :type recipient: str, 0x-hex + :param chain_id: Numeric chain id to use to parse signed transaction data + :type chain_id: number + :raises ValueError: Status is finalized, sent or never attempted sent + :returns: Transactions + :rtype: dict, with transaction hash as key, signed raw transaction as value + """ + session = SessionBase.bind_session(session) + q = session.query(Otx) + + if status != None: + if status == StatusEnum.PENDING or status & StatusBits.IN_NETWORK or not is_alive(status): + SessionBase.release_session(session) + raise ValueError('not a valid paused tx value: {}'.format(status)) + q = q.filter(Otx.status.op('&')(status.value)==status.value) + q = q.join(TxCache) + else: + q = q.filter(Otx.status>StatusEnum.PENDING.value) + q = q.filter(not_(Otx.status.op('&')(StatusBits.IN_NETWORK.value)>0)) + + if sender != None: + q = q.filter(TxCache.sender==sender) + + txs = {} + + for r in q.all(): + tx_signed_bytes = bytes.fromhex(r.signed_tx[2:]) + tx = unpack(tx_signed_bytes, chain_id) + if sender == None or tx['from'] == sender: + #gas += tx['gas'] * tx['gasPrice'] + txs[r.tx_hash] = r.signed_tx + + SessionBase.release_session(session) + + return txs + + +def get_status_tx(chain_spec, status, not_status=None, before=None, exact=False, limit=0, session=None): + """Retrieve transaction with a specific queue status. + + :param status: Status to match transactions with + :type status: str + :param before: If set, return only transactions older than the timestamp + :type status: datetime.dateTime + :param limit: Limit amount of returned transactions + :type limit: number + :returns: Transactions + :rtype: list of cic_eth.db.models.otx.Otx + """ + txs = {} + session = SessionBase.bind_session(session) + q = session.query(Otx) + q = q.join(TxCache) + # before = datetime.datetime.utcnow() + if before != None: + q = q.filter(TxCache.date_updated<before) + if exact: + q = q.filter(Otx.status==status) + else: + q = q.filter(Otx.status.op('&')(status)>0) + if not_status != None: + q = q.filter(Otx.status.op('&')(not_status)==0) + q = q.order_by(Otx.nonce.asc(), Otx.date_created.asc()) + i = 0 + for o in q.all(): + if limit > 0 and i == limit: + break + txs[o.tx_hash] = o.signed_tx + i += 1 + SessionBase.release_session(session) + return txs + + +def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, session=None): + """Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions. + + Will omit addresses that have the LockEnum.SEND bit in Lock set. + + (TODO) Will not return any rows if LockEnum.SEND bit in Lock is set for zero address. + + :param status: Defines the status used to filter as upcoming. + :type status: cic_eth.db.enum.StatusEnum + :param recipient: Ethereum address of recipient to return transaction for + :type recipient: str, 0x-hex + :param before: Only return transactions if their modification date is older than the given timestamp + :type before: datetime.datetime + :param chain_id: Chain id to use to parse signed transaction data + :type chain_id: number + :raises ValueError: Status is finalized, sent or never attempted sent + :returns: Transactions + :rtype: dict, with transaction hash as key, signed raw transaction as value + """ + session = SessionBase.bind_session(session) + q_outer = session.query( + TxCache.sender, + func.min(Otx.nonce).label('nonce'), + ) + q_outer = q_outer.join(TxCache) + q_outer = q_outer.join(Lock, isouter=True) + q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0)) + + if not is_alive(status): + SessionBase.release_session(session) + raise ValueError('not a valid non-final tx value: {}'.format(status)) + if status == StatusEnum.PENDING: + q_outer = q_outer.filter(Otx.status==status.value) + else: + q_outer = q_outer.filter(Otx.status.op('&')(status)==status) + + if not_status != None: + q_outer = q_outer.filter(Otx.status.op('&')(not_status)==0) + + if recipient != None: + q_outer = q_outer.filter(TxCache.recipient==recipient) + + q_outer = q_outer.group_by(TxCache.sender) + + txs = {} + + i = 0 + for r in q_outer.all(): + q = session.query(Otx) + q = q.join(TxCache) + q = q.filter(TxCache.sender==r.sender) + q = q.filter(Otx.nonce==r.nonce) + + if before != None: + q = q.filter(TxCache.date_checked<before) + + q = q.order_by(TxCache.date_created.desc()) + o = q.first() + + # TODO: audit; should this be possible if a row is found in the initial query? If not, at a minimum log error. + if o == None: + continue + + tx_signed_bytes = bytes.fromhex(o.signed_tx[2:]) + tx = unpack(tx_signed_bytes, chain_id) + txs[o.tx_hash] = o.signed_tx + + q = session.query(TxCache) + q = q.filter(TxCache.otx_id==o.id) + o = q.first() + + o.date_checked = datetime.datetime.now() + session.add(o) + session.commit() + + i += 1 + if limit > 0 and limit == i: + break + + SessionBase.release_session(session) + + return txs + + +def get_account_tx(chain_spec, address, as_sender=True, as_recipient=True, counterpart=None): + """Returns all local queue transactions for a given Ethereum address + + :param address: Ethereum address + :type address: str, 0x-hex + :param as_sender: If False, will omit transactions where address is sender + :type as_sender: bool + :param as_sender: If False, will omit transactions where address is recipient + :type as_sender: bool + :param counterpart: Only return transactions where this Ethereum address is the other end of the transaction (not in use) + :type counterpart: str, 0x-hex + :raises ValueError: If address is set to be neither sender nor recipient + :returns: Transactions + :rtype: dict, with transaction hash as key, signed raw transaction as value + """ + if not as_sender and not as_recipient: + raise ValueError('at least one of as_sender and as_recipient must be True') + + txs = {} + + session = SessionBase.create_session() + q = session.query(Otx) + q = q.join(TxCache) + if as_sender and as_recipient: + q = q.filter(or_(TxCache.sender==address, TxCache.recipient==address)) + elif as_sender: + q = q.filter(TxCache.sender==address) + else: + q = q.filter(TxCache.recipient==address) + q = q.order_by(Otx.nonce.asc(), Otx.date_created.asc()) + + results = q.all() + for r in results: + if txs.get(r.tx_hash) != None: + logg.debug('tx {} already recorded'.format(r.tx_hash)) + continue + txs[r.tx_hash] = r.signed_tx + session.close() + + return txs + diff --git a/chainqueue/state.py b/chainqueue/state.py @@ -0,0 +1,335 @@ +# standard imports +import logging + +# external imports +from hexathon import strip_0x + +# local imports +from chainqueue.db.models.otx import Otx +from chainqueue.db.models.tx import TxCache +from chainqueue.db.models.base import SessionBase +from chainqueue.db.enum import ( + StatusEnum, + StatusBits, + ) +from chainqueue.db.models.otx import OtxStateLog +from chainqueue.error import ( + NotLocalTxError, + TxStateChangeError, + ) + +logg = logging.getLogger().getChild(__name__) + + +def set_sent(tx_hash, fail=False): + """Used to set the status after a send attempt + + :param tx_hash: Transaction hash of record to modify + :type tx_hash: str, 0x-hex + :param fail: if True, will set a SENDFAIL status, otherwise a SENT status. (Default: False) + :type fail: boolean + :raises NotLocalTxError: If transaction not found in queue. + :returns: True if tx is known, False otherwise + :rtype: boolean + """ + session = SessionBase.create_session() + o = Otx.load(tx_hash, session=session) + if o == None: + logg.warning('not local tx, skippingĀ {}'.format(tx_hash)) + session.close() + return False + + try: + if fail: + o.sendfail(session=session) + else: + o.sent(session=session) + except TxStateChangeError as e: + logg.exception('set sent fail: {}'.format(e)) + session.close() + raise(e) + except Exception as e: + logg.exception('set sent UNEXPECED fail: {}'.format(e)) + session.close() + raise(e) + + + session.commit() + session.close() + + return tx_hash + + +def set_final(tx_hash, block=None, fail=False, cancel_obsoletes=True): + """Used to set the status of an incoming transaction result. + + :param tx_hash: Transaction hash of record to modify + :type tx_hash: str, 0x-hex + :param block: Block number if final status represents a confirmation on the network + :type block: number + :param fail: if True, will set a SUCCESS status, otherwise a REVERTED status. (Default: False) + :type fail: boolean + :raises NotLocalTxError: If transaction not found in queue. + """ + session = SessionBase.create_session() + o = Otx.load(tx_hash, session=session) + + if o == None: + session.close() + raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) + + session.flush() + + try: + if fail: + o.minefail(block, session=session) + else: + o.success(block, session=session) + session.commit() + except TxStateChangeError as e: + logg.exception('set final fail: {}'.format(e)) + session.close() + raise(e) + except Exception as e: + logg.exception('set final UNEXPECTED fail: {}'.format(e)) + session.close() + raise(e) + + session.close() + + return tx_hash + + +def set_cancel(tx_hash, manual=False): + """Used to set the status when a transaction is cancelled. + + Will set the state to CANCELLED or OVERRIDDEN + + :param tx_hash: Transaction hash of record to modify + :type tx_hash: str, 0x-hex + :param manual: If set, status will be OVERRIDDEN. Otherwise CANCELLED. + :type manual: boolean + :raises NotLocalTxError: If transaction not found in queue. + """ + + session = SessionBase.create_session() + o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() + if o == None: + session.close() + raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) + + session.flush() + + try: + if manual: + o.override(session=session) + else: + o.cancel(session=session) + session.commit() + except TxStateChangeError as e: + logg.exception('set cancel fail: {}'.format(e)) + except Exception as e: + logg.exception('set cancel UNEXPECTED fail: {}'.format(e)) + session.close() + + return tx_hash + + +def set_rejected(tx_hash): + """Used to set the status when the node rejects sending a transaction to network + + Will set the state to REJECTED + + :param tx_hash: Transaction hash of record to modify + :type tx_hash: str, 0x-hex + :raises NotLocalTxError: If transaction not found in queue. + """ + + session = SessionBase.create_session() + o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() + if o == None: + session.close() + raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) + + session.flush() + + o.reject(session=session) + session.commit() + session.close() + + return tx_hash + + +def set_fubar(tx_hash): + """Used to set the status when an unexpected error occurs. + + Will set the state to FUBAR + + :param tx_hash: Transaction hash of record to modify + :type tx_hash: str, 0x-hex + :raises NotLocalTxError: If transaction not found in queue. + """ + + session = SessionBase.create_session() + o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first() + if o == None: + session.close() + raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) + + session.flush() + + o.fubar(session=session) + session.commit() + session.close() + + return tx_hash + + +def set_manual(tx_hash): + """Used to set the status when queue is manually changed + + Will set the state to MANUAL + + :param tx_hash: Transaction hash of record to modify + :type tx_hash: str, 0x-hex + :raises NotLocalTxError: If transaction not found in queue. + """ + + session = SessionBase.create_session() + o = Otx.load(tx_hash, session=session) + if o == None: + session.close() + raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) + + session.flush() + + o.manual(session=session) + session.commit() + session.close() + + return tx_hash + + +def set_ready(tx_hash): + """Used to mark a transaction as ready to be sent to network + + :param tx_hash: Transaction hash of record to modify + :type tx_hash: str, 0x-hex + :raises NotLocalTxError: If transaction not found in queue. + """ + session = SessionBase.create_session() + o = Otx.load(tx_hash, session=session) + if o == None: + session.close() + raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) + session.flush() + + if o.status & StatusBits.GAS_ISSUES or o.status == StatusEnum.PENDING: + o.readysend(session=session) + else: + o.retry(session=session) + + session.commit() + session.close() + + return tx_hash + + +def set_reserved(tx_hash): + session = SessionBase.create_session() + o = Otx.load(tx_hash, session=session) + if o == None: + session.close() + raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) + + session.flush() + + o.reserve(session=session) + session.commit() + session.close() + + return tx_hash + + + +def set_waitforgas(tx_hash): + """Used to set the status when a transaction must be deferred due to gas refill + + Will set the state to WAITFORGAS + + :param tx_hash: Transaction hash of record to modify + :type tx_hash: str, 0x-hex + :raises NotLocalTxError: If transaction not found in queue. + """ + + session = SessionBase.create_session() + o = Otx.load(tx_hash, session=session) + if o == None: + session.close() + raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) + + session.flush() + + o.waitforgas(session=session) + session.commit() + session.close() + + return tx_hash + + +def get_state_log(tx_hash): + + logs = [] + + session = SessionBase.create_session() + + q = session.query(OtxStateLog) + q = q.join(Otx) + q = q.filter(Otx.tx_hash==tx_hash) + q = q.order_by(OtxStateLog.date.asc()) + for l in q.all(): + logs.append((l.date, l.status,)) + + session.close() + + return logs + + + +def cancel_obsoletes_by_cache(tx_hash): + session = SessionBase.create_session() + q = session.query( + Otx.nonce.label('nonce'), + TxCache.sender.label('sender'), + Otx.id.label('otxid'), + ) + q = q.join(TxCache) + q = q.filter(Otx.tx_hash==strip_0x(tx_hash)) + o = q.first() + + nonce = o.nonce + sender = o.sender + otxid = o.otxid + + q = session.query(Otx) + q = q.join(TxCache) + q = q.filter(Otx.nonce==nonce) + q = q.filter(TxCache.sender==sender) + q = q.filter(Otx.tx_hash!=strip_0x(tx_hash)) + + for otwo in q.all(): + try: + otwo.cancel(True, session=session) + except TxStateChangeError as e: + logg.exception('cancel non-final fail: {}'.format(e)) + session.close() + raise(e) + except Exception as e: + logg.exception('cancel non-final UNEXPECTED fail: {}'.format(e)) + session.close() + raise(e) + session.commit() + session.close() + + return tx_hash + diff --git a/chainqueue/tx.py b/chainqueue/tx.py @@ -0,0 +1,67 @@ +# standard imports +import logging + +# local imports +from chainqueue.db.models.otx import Otx +from chainqueue.db.models.tx import TxCache +from chainqueue.db.models.base import SessionBase +from chainqueue.db.enum import ( + StatusBits, + ) +from chainqueue.error import TxStateChangeError + +logg = logging.getLogger().getChild(__name__) + + +def create(nonce, holder_address, tx_hash, signed_tx, chain_spec, obsolete_predecessors=True, session=None): + """Create a new transaction queue record. + + :param nonce: Transaction nonce + :type nonce: int + :param holder_address: Sender address + :type holder_address: str, 0x-hex + :param tx_hash: Transaction hash + :type tx_hash: str, 0x-hex + :param signed_tx: Signed raw transaction + :type signed_tx: str, 0x-hex + :param chain_spec: Chain spec to create transaction for + :type chain_spec: ChainSpec + :returns: transaction hash + :rtype: str, 0x-hash + """ + session = SessionBase.bind_session(session) + + o = Otx.add( + nonce=nonce, + tx_hash=tx_hash, + signed_tx=signed_tx, + session=session, + ) + session.flush() + + if obsolete_predecessors: + q = session.query(Otx) + q = q.join(TxCache) + q = q.filter(Otx.nonce==nonce) + q = q.filter(TxCache.sender==holder_address) + q = q.filter(Otx.tx_hash!=tx_hash) + q = q.filter(Otx.status.op('&')(StatusBits.FINAL)==0) + + for otx in q.all(): + logg.info('otx {} obsoleted by {}'.format(otx.tx_hash, tx_hash)) + try: + otx.cancel(confirmed=False, session=session) + except TxStateChangeError as e: + logg.exception('obsolete fail: {}'.format(e)) + session.close() + raise(e) + except Exception as e: + logg.exception('obsolete UNEXPECTED fail: {}'.format(e)) + session.close() + raise(e) + + + session.commit() + SessionBase.release_session(session) + logg.debug('queue created nonce {} from {} hash {}'.format(nonce, holder_address, tx_hash)) + return tx_hash diff --git a/tests/test_otx.py b/tests/test_otx.py @@ -0,0 +1,63 @@ +# standard imports +import os +import logging +import unittest + +# external imports +from hexathon import ( + strip_0x, + add_0x, + ) +from chainlib.chain import ChainSpec + +# local imports +from chainqueue.db.models.otx import Otx +from chainqueue.db.models.tx import TxCache +from chainqueue.tx import create +from chainqueue.state import * +from chainqueue.db.enum import ( + is_alive, + is_error_status, + ) + +# test imports +from tests.base import TestBase + +logging.basicConfig(level=logging.DEBUG) +logg = logging.getLogger() + + +class TestOtx(TestBase): + + def setUp(self): + super(TestOtx, self).setUp() + self.tx_hash = add_0x(os.urandom(32).hex()) + self.tx = add_0x(os.urandom(128).hex()) + self.nonce = 42 + self.alice = add_0x(os.urandom(20).hex()) + + tx_hash = create(self.nonce, self.alice, self.tx_hash, self.tx, self.chain_spec, session=self.session) + self.assertEqual(tx_hash, self.tx_hash) + + + def test_ideal_state_sequence(self): + set_ready(self.tx_hash) + otx = Otx.load(self.tx_hash, session=self.session) + self.assertEqual(otx.status, StatusBits.QUEUED) + + set_reserved(self.tx_hash) + self.session.refresh(otx) + self.assertEqual(otx.status, StatusBits.RESERVED) + + set_sent(self.tx_hash) + self.session.refresh(otx) + self.assertEqual(otx.status, StatusBits.IN_NETWORK) + + set_final(self.tx_hash, block=1024) + self.session.refresh(otx) + self.assertFalse(is_alive(otx.status)) + self.assertFalse(is_error_status(otx.status)) + + +if __name__ == '__main__': + unittest.main()