chainqueue

Blockchain transaction queue control
Log | Files | Refs | LICENSE

commit a5078a7e37deed45d7fa279c95590ee916db7c45
parent 6b044f18dbf18f75dd09ed354b1d60ff42b27c49
Author: nolash <dev@holbrook.no>
Date:   Fri,  2 Apr 2021 10:41:38 +0200

Add models, otx add test

Diffstat:
M.gitignore | 1+
Achainqueue/db/enum.py | 157+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Achainqueue/db/error.py | 9+++++++++
Achainqueue/db/models/otx.py | 571+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Achainqueue/db/models/tx.py | 151++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtests/base.py | 26+-------------------------
Mtests/test_basic.py | 20+++++++++++++++++++-
7 files changed, 909 insertions(+), 26 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -1,2 +1,3 @@ __pycache__ *.pyc +*.sqlite diff --git a/chainqueue/db/enum.py b/chainqueue/db/enum.py @@ -0,0 +1,157 @@ +# standard imports +import enum + + +@enum.unique +class StatusBits(enum.IntEnum): + """Individual bit flags that are combined to define the state and legacy of a queued transaction + + """ + QUEUED = 0x01 # transaction should be sent to network + IN_NETWORK = 0x08 # transaction is in network + + DEFERRED = 0x10 # an attempt to send the transaction to network has failed + GAS_ISSUES = 0x20 # transaction is pending sender account gas funding + + LOCAL_ERROR = 0x100 # errors that originate internally from the component + NODE_ERROR = 0x200 # errors originating in the node (invalid RLP input...) + NETWORK_ERROR = 0x400 # errors that originate from the network (REVERT) + UNKNOWN_ERROR = 0x800 # unclassified errors (the should not occur) + + FINAL = 0x1000 # transaction processing has completed + OBSOLETE = 0x2000 # transaction has been replaced by a different transaction with higher fee + MANUAL = 0x8000 # transaction processing has been manually overridden + + +@enum.unique +class StatusEnum(enum.IntEnum): + """ + + - Inactive, not finalized. (<0) + * PENDING: The initial state of a newly added transaction record. No action has been performed on this transaction yet. + * SENDFAIL: The transaction was not received by the node. + * RETRY: The transaction is queued for a new send attempt after previously failing. + * READYSEND: The transaction is queued for its first send attempt + * OBSOLETED: A new transaction with the same nonce and higher gas has been sent to network. + * WAITFORGAS: The transaction is on hold pending gas funding. + - Active state: (==0) + * SENT: The transaction has been sent to the mempool. + - Inactive, finalized. (>0) + * FUBAR: Unknown error occurred and transaction is abandoned. Manual intervention needed. + * CANCELLED: The transaction was sent, but was not mined and has disappered from the mempool. This usually follows a transaction being obsoleted. + * OVERRIDDEN: Transaction has been manually overriden. + * REJECTED: The transaction was rejected by the node. + * REVERTED: The transaction was mined, but exception occurred during EVM execution. (Block number will be set) + * SUCCESS: THe transaction was successfully mined. (Block number will be set) + + """ + PENDING = 0 + + SENDFAIL = StatusBits.DEFERRED | StatusBits.LOCAL_ERROR + RETRY = StatusBits.QUEUED | StatusBits.DEFERRED + READYSEND = StatusBits.QUEUED + + OBSOLETED = StatusBits.OBSOLETE | StatusBits.IN_NETWORK + + WAITFORGAS = StatusBits.GAS_ISSUES + + SENT = StatusBits.IN_NETWORK + FUBAR = StatusBits.FINAL | StatusBits.UNKNOWN_ERROR + CANCELLED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.OBSOLETE + OVERRIDDEN = StatusBits.FINAL | StatusBits.OBSOLETE | StatusBits.MANUAL + + REJECTED = StatusBits.NODE_ERROR | StatusBits.FINAL + REVERTED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.NETWORK_ERROR + SUCCESS = StatusBits.IN_NETWORK | StatusBits.FINAL + + +@enum.unique +class LockEnum(enum.IntEnum): + """ + STICKY: When set, reset is not possible + CREATE: Disable creation of accounts + SEND: Disable sending to network + QUEUE: Disable queueing new or modified transactions + """ + STICKY=1 + CREATE=2 + SEND=4 + QUEUE=8 + QUERY=16 + ALL=int(0xfffffffffffffffe) + + +def status_str(v, bits_only=False): + """Render a human-readable string describing the status + + If the bit field exactly matches a StatusEnum value, the StatusEnum label will be returned. + + If a StatusEnum cannot be matched, the string will be postfixed with "*", unless explicitly instructed to return bit field labels only. + + :param v: Status bit field + :type v: number + :param bits_only: Only render individual bit labels. + :type bits_only: bool + :returns: Status string + :rtype: str + """ + s = '' + if not bits_only: + try: + s = StatusEnum(v).name + return s + except ValueError: + pass + + if v == 0: + return 'NONE' + + for i in range(16): + b = (1 << i) + if (b & 0xffff) & v: + n = StatusBits(b).name + if len(s) > 0: + s += ',' + s += n + if not bits_only: + s += '*' + return s + + +def all_errors(): + """Bit mask of all error states + + :returns: Error flags + :rtype: number + """ + return StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR + + +def is_error_status(v): + """Check if value is an error state + + :param v: Status bit field + :type v: number + :returns: True if error + :rtype: bool + """ + return bool(v & all_errors()) + + +def dead(): + """Bit mask defining whether a transaction is still likely to be processed on the network. + + :returns: Bit mask + :rtype: number + """ + return StatusBits.FINAL | StatusBits.OBSOLETE + + +def is_alive(v): + """Check if transaction is still likely to be processed on the network. + + The contingency of "likely" refers to the case a transaction has been obsoleted after sent to the network, but the network still confirms the obsoleted transaction. The return value of this method will not change as a result of this, BUT the state itself will (as the FINAL bit will be set). + + :returns: + """ + return bool(v & dead() == 0) diff --git a/chainqueue/db/error.py b/chainqueue/db/error.py @@ -0,0 +1,9 @@ +class TxStateChangeError(Exception): + """Raised when an invalid state change of a queued transaction occurs + """ + pass + + +class UnknownConvertError(Exception): + """Raised when a non-existent convert to transaction subtask is requested + """ diff --git a/chainqueue/db/models/otx.py b/chainqueue/db/models/otx.py @@ -0,0 +1,571 @@ +# standard imports +import datetime +import logging + +# external imports +from sqlalchemy import Column, Enum, String, Integer, DateTime, Text, or_, ForeignKey +from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method +from hexathon import ( + strip_0x, + ) + +# local imports +from .base import SessionBase +from chainqueue.db.enum import ( + StatusEnum, + StatusBits, + status_str, + is_error_status, + ) +from chainqueue.db.error import TxStateChangeError + +logg = logging.getLogger().getChild(__name__) + + +class OtxStateLog(SessionBase): + + __tablename__ = 'otx_state_log' + + date = Column(DateTime, default=datetime.datetime.utcnow) + status = Column(Integer) + otx_id = Column(Integer, ForeignKey('otx.id')) + + + def __init__(self, otx): + self.otx_id = otx.id + self.status = otx.status + + +class Otx(SessionBase): + """Outgoing transactions with local origin. + + :param nonce: Transaction nonce + :type nonce: number + :param address: Ethereum address of recipient - NOT IN USE, REMOVE + :type address: str + :param tx_hash: Tranasction hash + :type tx_hash: str, 0x-hex + :param signed_tx: Signed raw transaction data + :type signed_tx: str, 0x-hex + """ + __tablename__ = 'otx' + + tracing = False + """Whether to enable queue state tracing""" + + nonce = Column(Integer) + date_created = Column(DateTime, default=datetime.datetime.utcnow) + tx_hash = Column(String(66)) + signed_tx = Column(Text) + status = Column(Integer) + block = Column(Integer) + + + def __set_status(self, status, session): + self.status |= status + session.add(self) + session.flush() + + + def __reset_status(self, status, session): + status_edit = ~status & self.status + self.status &= status_edit + session.add(self) + session.flush() + + + def __status_already_set(self, status): + r = bool(self.status & status) + if r: + logg.warning('status bit {} already set on {}'.format(status.name, self.tx_hash)) + return r + + + def __status_not_set(self, status): + r = not(self.status & status) + if r: + logg.warning('status bit {} not set on {}'.format(status.name, self.tx_hash)) + return r + + + def set_block(self, block, session=None): + """Set block number transaction was mined in. + + Only manipulates object, does not transaction or commit to backend. + + :param block: Block number + :type block: number + :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. + """ + session = SessionBase.bind_session(session) + + if self.block != None: + SessionBase.release_session(session) + raise TxStateChangeError('Attempted set block {} when block was already {}'.format(block, self.block)) + self.block = block + session.add(self) + session.flush() + + SessionBase.release_session(session) + + + def waitforgas(self, session=None): + """Marks transaction as suspended pending gas funding. + + Only manipulates object, does not transaction or commit to backend. + + :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. + """ + if self.__status_already_set(StatusBits.GAS_ISSUES): + return + + session = SessionBase.bind_session(session) + + if self.status & StatusBits.FINAL: + SessionBase.release_session(session) + raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) + if self.status & StatusBits.IN_NETWORK: + SessionBase.release_session(session) + raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status))) + + self.__set_status(StatusBits.GAS_ISSUES, session) + self.__reset_status(StatusBits.QUEUED | StatusBits.DEFERRED, session) + + if self.tracing: + self.__state_log(session=session) + + SessionBase.release_session(session) + + + def fubar(self, session=None): + """Marks transaction as "fubar." Any transaction marked this way is an anomaly and may be a symptom of a serious problem. + + Only manipulates object, does not transaction or commit to backend. + """ + if self.__status_already_set(StatusBits.UNKNOWN_ERROR): + return + + session = SessionBase.bind_session(session) + + if self.status & StatusBits.FINAL: + SessionBase.release_session(session) + raise TxStateChangeError('FUBAR cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) + if is_error_status(self.status): + SessionBase.release_session(session) + raise TxStateChangeError('FUBAR cannot be set on an entry with an error state already set ({})'.format(status_str(self.status))) + + self.__set_status(StatusBits.UNKNOWN_ERROR | StatusBits.FINAL, session) + + if self.tracing: + self.__state_log(session=session) + + SessionBase.release_session(session) + + + def reject(self, session=None): + """Marks transaction as "rejected," which means the node rejected sending the transaction to the network. The nonce has not been spent, and the transaction should be replaced. + + Only manipulates object, does not transaction or commit to backend. + """ + if self.__status_already_set(StatusBits.NODE_ERROR): + return + + session = SessionBase.bind_session(session) + + if self.status & StatusBits.FINAL: + SessionBase.release_session(session) + raise TxStateChangeError('REJECTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) + if self.status & StatusBits.IN_NETWORK: + SessionBase.release_session(session) + raise TxStateChangeError('REJECTED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status))) + if is_error_status(self.status): + SessionBase.release_session(session) + raise TxStateChangeError('REJECTED cannot be set on an entry with an error state already set ({})'.format(status_str(self.status))) + + self.__set_status(StatusBits.NODE_ERROR | StatusBits.FINAL, session) + + if self.tracing: + self.__state_log(session=session) + + SessionBase.release_session(session) + + + def override(self, manual=False, session=None): + """Marks transaction as manually overridden. + + Only manipulates object, does not transaction or commit to backend. + """ + + session = SessionBase.bind_session(session) + + if self.status & StatusBits.FINAL: + SessionBase.release_session(session) + raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) + if self.status & StatusBits.IN_NETWORK: + SessionBase.release_session(session) + raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status))) + if self.status & StatusBits.OBSOLETE: + SessionBase.release_session(session) + raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already OBSOLETE ({})'.format(status_str(self.status))) + + self.__set_status(StatusBits.OBSOLETE, session) + #if manual: + # self.__set_status(StatusBits.MANUAL, session) + self.__reset_status(StatusBits.QUEUED | StatusBits.IN_NETWORK, session) + + if self.tracing: + self.__state_log(session=session) + + SessionBase.release_session(session) + + + def manual(self, session=None): + + session = SessionBase.bind_session(session) + + + if self.status & StatusBits.FINAL: + SessionBase.release_session(session) + raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) + + self.__set_status(StatusBits.MANUAL, session) + + if self.tracing: + self.__state_log(session=session) + + SessionBase.release_session(session) + + def retry(self, session=None): + """Marks transaction as ready to retry after a timeout following a sendfail or a completed gas funding. + + Only manipulates object, does not transaction or commit to backend. + + :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. + """ + if self.__status_already_set(StatusBits.QUEUED): + return + + session = SessionBase.bind_session(session) + + if self.status & StatusBits.FINAL: + SessionBase.release_session(session) + raise TxStateChangeError('RETRY cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) + if not is_error_status(self.status) and not StatusBits.IN_NETWORK & self.status > 0: + SessionBase.release_session(session) + raise TxStateChangeError('RETRY cannot be set on an entry that has no error ({})'.format(status_str(self.status))) + + self.__set_status(StatusBits.QUEUED, session) + self.__reset_status(StatusBits.GAS_ISSUES, session) + + if self.tracing: + self.__state_log(session=session) + + SessionBase.release_session(session) + + + def readysend(self, session=None): + """Marks transaction as ready for initial send attempt. + + Only manipulates object, does not transaction or commit to backend. + + :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. + """ + if self.__status_already_set(StatusBits.QUEUED): + return + + session = SessionBase.bind_session(session) + + if self.status & StatusBits.FINAL: + SessionBase.release_session(session) + raise TxStateChangeError('READYSEND cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) + if is_error_status(self.status): + SessionBase.release_session(session) + raise TxStateChangeError('READYSEND cannot be set on an errored state ({})'.format(status_str(self.status))) + + self.__set_status(StatusBits.QUEUED, session) + self.__reset_status(StatusBits.GAS_ISSUES, session) + + if self.tracing: + self.__state_log(session=session) + + SessionBase.release_session(session) + + + def sent(self, session=None): + """Marks transaction as having been sent to network. + + Only manipulates object, does not transaction or commit to backend. + + :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. + """ + if self.__status_already_set(StatusBits.IN_NETWORK): + return + + session = SessionBase.bind_session(session) + + if self.status & StatusBits.FINAL: + SessionBase.release_session(session) + raise TxStateChangeError('SENT cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) + + self.__set_status(StatusBits.IN_NETWORK, session) + self.__reset_status(StatusBits.DEFERRED | StatusBits.QUEUED | StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR, session) + + if self.tracing: + self.__state_log(session=session) + + SessionBase.release_session(session) + + + def sendfail(self, session=None): + """Marks that an attempt to send the transaction to the network has failed. + + Only manipulates object, does not transaction or commit to backend. + + :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. + """ + if self.__status_already_set(StatusBits.NODE_ERROR): + return + + session = SessionBase.bind_session(session) + + if self.status & StatusBits.FINAL: + SessionBase.release_session(session) + raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) + if self.status & StatusBits.IN_NETWORK: + SessionBase.release_session(session) + raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status))) + + self.__set_status(StatusBits.LOCAL_ERROR | StatusBits.DEFERRED, session) + self.__reset_status(StatusBits.QUEUED | StatusBits.GAS_ISSUES, session) + + if self.tracing: + self.__state_log(session=session) + + SessionBase.release_session(session) + + + def dequeue(self, session=None): + """Marks that a process to execute send attempt is underway + + Only manipulates object, does not transaction or commit to backend. + + :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. + """ + if self.__status_not_set(StatusBits.QUEUED): + return + + session = SessionBase.bind_session(session) + + if self.status & StatusBits.FINAL: + SessionBase.release_session(session) + raise TxStateChangeError('QUEUED cannot be unset on an entry with FINAL state set ({})'.format(status_str(self.status))) + if self.status & StatusBits.IN_NETWORK: + SessionBase.release_session(session) + raise TxStateChangeError('QUEUED cannot be unset on an entry with IN_NETWORK state set ({})'.format(status_str(self.status))) + + self.__reset_status(StatusBits.QUEUED, session) + + if self.tracing: + self.__state_log(session=session) + + SessionBase.release_session(session) + + + + def minefail(self, block, session=None): + """Marks that transaction was mined but code execution did not succeed. + + Only manipulates object, does not transaction or commit to backend. + + :param block: Block number transaction was mined in. + :type block: number + :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. + """ + if self.__status_already_set(StatusBits.NETWORK_ERROR): + return + + session = SessionBase.bind_session(session) + + if self.status & StatusBits.FINAL: + SessionBase.release_session(session) + raise TxStateChangeError('REVERTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) + if not self.status & StatusBits.IN_NETWORK: + SessionBase.release_session(session) + raise TxStateChangeError('REVERTED cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status))) + + if block != None: + self.block = block + + self.__set_status(StatusBits.NETWORK_ERROR | StatusBits.FINAL, session) + + if self.tracing: + self.__state_log(session=session) + + SessionBase.release_session(session) + + + def cancel(self, confirmed=False, session=None): + """Marks that the transaction has been succeeded by a new transaction with same nonce. + + If set to confirmed, the previous state must be OBSOLETED, and will transition to CANCELLED - a finalized state. Otherwise, the state must follow a non-finalized state, and will be set to OBSOLETED. + + Only manipulates object, does not transaction or commit to backend. + + :param confirmed: Whether transition is to a final state. + :type confirmed: bool + :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. + """ + session = SessionBase.bind_session(session) + + if self.status & StatusBits.FINAL: + SessionBase.release_session(session) + raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) + + if confirmed: + if self.status > 0 and not self.status & StatusBits.OBSOLETE: + SessionBase.release_session(session) + raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status))) + self.__set_status(StatusEnum.CANCELLED, session) + else: + self.__set_status(StatusEnum.OBSOLETED, session) + + if self.tracing: + self.__state_log(session=session) + + SessionBase.release_session(session) + + + def success(self, block, session=None): + """Marks that transaction was successfully mined. + + Only manipulates object, does not transaction or commit to backend. + + :param block: Block number transaction was mined in. + :type block: number + :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. + """ + + session = SessionBase.bind_session(session) + + if self.status & StatusBits.FINAL: + SessionBase.release_session(session) + raise TxStateChangeError('SUCCESS cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status))) + if not self.status & StatusBits.IN_NETWORK: + SessionBase.release_session(session) + raise TxStateChangeError('SUCCESS cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status))) + if is_error_status(self.status): + SessionBase.release_session(session) + raise TxStateChangeError('SUCCESS cannot be set on an entry with error state set ({})'.format(status_str(self.status))) + + if block != None: + self.block = block + self.__set_status(StatusEnum.SUCCESS, session) + + if self.tracing: + self.__state_log(session=session) + + SessionBase.release_session(session) + + + @staticmethod + def get(status=0, limit=4096, status_exact=True, session=None): + """Returns outgoing transaction lists by status. + + Status may either be matched exactly, or be an upper bound of the integer value of the status enum. + + :param status: Status value to use in query + :type status: cic_eth.db.enum.StatusEnum + :param limit: Max results to return + :type limit: number + :param status_exact: Whether or not to perform exact status match + :type bool: + :returns: List of transaction hashes + :rtype: tuple, where first element is transaction hash + """ + e = None + + session = SessionBase.bind_session(session) + + if status_exact: + e = session.query(Otx.tx_hash).filter(Otx.status==status).order_by(Otx.date_created.asc()).limit(limit).all() + else: + e = session.query(Otx.tx_hash).filter(Otx.status<=status).order_by(Otx.date_created.asc()).limit(limit).all() + + SessionBase.release_session(session) + return e + + + @staticmethod + def load(tx_hash, session=None): + """Retrieves the outgoing transaction record by transaction hash. + + :param tx_hash: Transaction hash + :type tx_hash: str, 0x-hex + """ + session = SessionBase.bind_session(session) + + q = session.query(Otx) + q = q.filter(Otx.tx_hash==tx_hash) + + SessionBase.release_session(session) + + return q.first() + + + @staticmethod + def account(account_address): + """Retrieves all transaction hashes for which the given Ethereum address is sender or recipient. + + :param account_address: Ethereum address to use in query. + :type account_address: str, 0x-hex + :returns: Outgoing transactions + :rtype: tuple, where first element is transaction hash + """ + session = Otx.create_session() + q = session.query(Otx.tx_hash) + q = q.join(TxCache) + q = q.filter(or_(TxCache.sender==account_address, TxCache.recipient==account_address)) + txs = q.all() + session.close() + return list(txs) + + + def __state_log(self, session): + l = OtxStateLog(self) + session.add(l) + + + # TODO: it is not safe to return otx here unless session has been passed in + @staticmethod + def add(nonce, address, tx_hash, signed_tx, session=None): + external_session = session != None + + session = SessionBase.bind_session(session) + + otx = Otx(nonce, address, tx_hash, signed_tx) + session.add(otx) + session.flush() + if otx.tracing: + otx.__state_log(session=session) + session.flush() + + SessionBase.release_session(session) + + if not external_session: + return None + + return otx + + + def __init__(self, nonce, tx_hash, signed_tx): + self.nonce = nonce + self.tx_hash = strip_0x(tx_hash) + self.signed_tx = strip_0x(signed_tx) + self.status = StatusEnum.PENDING + #signed_tx_bytes = bytes.fromhex(strip_0x(signed_tx)) + #signed_tx_bytes = bytes.fromhex(strip_0x(tx_hash)) + + # sender_address = address_hex_from_signed_tx(signed_tx_bytes) + # logg.debug('decoded tx {}'.format(sender_address)) + + diff --git a/chainqueue/db/models/tx.py b/chainqueue/db/models/tx.py @@ -0,0 +1,151 @@ +# standard imports +import datetime + +# third-party imports +from sqlalchemy import Column, String, Integer, DateTime, Enum, ForeignKey, Boolean, NUMERIC +from sqlalchemy.ext.hybrid import hybrid_method, hybrid_property +#from sqlalchemy.orm import relationship, backref +#from sqlalchemy.ext.declarative import declarative_base + +# local imports +from .base import SessionBase +from .otx import Otx +from cic_eth.db.util import num_serialize +from cic_eth.error import NotLocalTxError +from cic_eth.db.error import TxStateChangeError + + +class TxCache(SessionBase): + """Metadata expansions for outgoing transactions. + + These records are not essential for handling of outgoing transaction queues. It is implemented to reduce the amount of computation spent of parsing and analysing raw signed transaction data. + + Instantiation of the object will fail if an outgoing transaction record with the same transaction hash does not exist. + + Typically three types of transactions are recorded: + + - Token transfers; where source and destination token values and addresses are identical, sender and recipient differ. + - Token conversions; source and destination token values and addresses differ, sender and recipient are identical. + - Any other transaction; source and destination token addresses are zero-address. + + :param tx_hash: Transaction hash + :type tx_hash: str, 0x-hex + :param sender: Ethereum address of transaction sender + :type sender: str, 0x-hex + :param recipient: Ethereum address of transaction beneficiary (e.g. token transfer recipient) + :type recipient: str, 0x-hex + :param source_token_address: Contract address of token that sender spent from + :type source_token_address: str, 0x-hex + :param destination_token_address: Contract address of token that recipient will receive balance of + :type destination_token_address: str, 0x-hex + :param from_value: Amount of source tokens spent + :type from_value: number + :param to_value: Amount of destination tokens received + :type to_value: number + :param block_number: Block height the transaction was mined at, or None if not yet mined + :type block_number: number or None + :param tx_number: Block transaction height the transaction was mined at, or None if not yet mined + :type tx_number: number or None + :raises FileNotFoundError: Outgoing transaction for given transaction hash does not exist + """ + __tablename__ = 'tx_cache' + + otx_id = Column(Integer, ForeignKey('otx.id')) + source_token_address = Column(String(42)) + destination_token_address = Column(String(42)) + sender = Column(String(42)) + recipient = Column(String(42)) + from_value = Column(NUMERIC()) + to_value = Column(NUMERIC()) + block_number = Column(Integer()) + tx_index = Column(Integer()) + date_created = Column(DateTime, default=datetime.datetime.utcnow) + date_updated = Column(DateTime, default=datetime.datetime.utcnow) + date_checked = Column(DateTime, default=datetime.datetime.utcnow) + + + def check(self): + """Update the "checked" timestamp to current time. + + Only manipulates object, does not transaction or commit to backend. + """ + self.date_checked = datetime.datetime.now() + + + @staticmethod + def clone( + tx_hash_original, + tx_hash_new, + session=None, + ): + """Copy tx cache data and associate it with a new transaction. + + :param tx_hash_original: tx cache data to copy + :type tx_hash_original: str, 0x-hex + :param tx_hash_new: tx hash to associate the copied entry with + :type tx_hash_new: str, 0x-hex + """ + session = SessionBase.bind_session(session) + + q = session.query(TxCache) + q = q.join(Otx) + q = q.filter(Otx.tx_hash==tx_hash_original) + txc = q.first() + + if txc == None: + SessionBase.release_session(session) + raise NotLocalTxError('original {}'.format(tx_hash_original)) + if txc.block_number != None: + SessionBase.release_session(session) + raise TxStateChangeError('cannot clone tx cache of confirmed tx {}'.format(tx_hash_original)) + + session.flush() + q = session.query(Otx) + q = q.filter(Otx.tx_hash==tx_hash_new) + otx = q.first() + + if otx == None: + SessionBase.release_session(session) + raise NotLocalTxError('new {}'.format(tx_hash_new)) + + txc_new = TxCache( + otx.tx_hash, + txc.sender, + txc.recipient, + txc.source_token_address, + txc.destination_token_address, + int(txc.from_value), + int(txc.to_value), + session=session, + ) + session.add(txc_new) + session.commit() + + SessionBase.release_session(session) + + + def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None): + session = SessionBase.bind_session(session) + q = session.query(Otx) + q = q.filter(Otx.tx_hash==tx_hash) + tx = q.first() + if tx == None: + SessionBase.release_session(session) + raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash)) + self.otx_id = tx.id + + self.sender = sender + self.recipient = recipient + self.source_token_address = source_token_address + self.destination_token_address = destination_token_address + self.from_value = from_value + self.to_value = to_value + self.block_number = block_number + self.tx_index = tx_index + # not automatically set in sqlite, it seems: + self.date_created = datetime.datetime.utcnow() + self.date_updated = self.date_created + self.date_checked = self.date_created + + SessionBase.release_session(session) + diff --git a/tests/base.py b/tests/base.py @@ -29,14 +29,6 @@ class TestBase(unittest.TestCase): migrationsdir = os.path.join(dbdir, 'migrations', 'default') logg.info('using migrations directory {}'.format(migrationsdir)) -# db_dir = tempfile.mkdtemp() -# self.db_path = os.path.join(db_dir, 'test.sqlite') -# config = { -# 'DATABASE_ENGINE': 'sqlite', -# 'DATABASE_DRIVER': 'pysqlite', -# 'DATABASE_NAME': self.db_path, -# } - config = { 'DATABASE_ENGINE': 'sqlite', 'DATABASE_DRIVER': 'pysqlite', @@ -57,24 +49,8 @@ class TestBase(unittest.TestCase): alembic.command.downgrade(ac, 'base') alembic.command.upgrade(ac, 'head') - - self.session = SessionBase.create_session() -# -# f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '1.sql'), 'r') -# sql = f.read() -# f.close() -# -# conn = SessionBase.engine.connect() -# conn.execute(sql) -# -# f = open(os.path.join(script_dir, '..', 'sql', 'sqlite', '2.sql'), 'r') -# sql = f.read() -# f.close() -# -# conn = SessionBase.engine.connect() -# conn.execute(sql) -# + self.chain_spec = ChainSpec('evm', 'foo', 42, 'bar') diff --git a/tests/test_basic.py b/tests/test_basic.py @@ -1,7 +1,17 @@ # standard imports +import os import logging import unittest +# external imports +from hexathon import ( + strip_0x, + add_0x, + ) + +# local imports +from chainqueue.db.models.otx import Otx + # test imports from tests.base import TestBase @@ -11,9 +21,17 @@ logg = logging.getLogger() class TestBasic(TestBase): def test_hello(self): - logg.debug('foo') pass + def test_otx(self): + tx_hash = add_0x(os.urandom(32).hex()) + address = add_0x(os.urandom(20).hex()) + tx = add_0x(os.urandom(128).hex()) + nonce = 42 + otx = Otx(nonce, tx_hash, tx) + self.session.add(otx) + + if __name__ == '__main__': unittest.main()