chainqueue

Blockchain transaction queue control
Log | Files | Refs | LICENSE

commit 565a58252a527d171e6155d2b06c37de881cf0fb
parent f8c1deb752b00e974cc8c80424a7c881af335593
Author: nolash <dev@holbrook.no>
Date:   Tue,  1 Jun 2021 12:43:12 +0200

Add fs queue backend add file

Diffstat:
Mchainqueue/db/enum.py | 154+------------------------------------------------------------------------------
Achainqueue/enum.py | 156+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mchainqueue/error.py | 7+++++++
Achainqueue/fs/cache.py | 89+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mchainqueue/fs/dir.py | 16+++++++++++++++-
Atests/test_fs.py | 42++++++++++++++++++++++++++++++++++++++++++
Mtests/test_hexdir.py | 7+++++++
7 files changed, 317 insertions(+), 154 deletions(-)

diff --git a/chainqueue/db/enum.py b/chainqueue/db/enum.py @@ -1,153 +1 @@ -# standard imports -import enum - - -@enum.unique -class StatusBits(enum.IntEnum): - """Individual bit flags that are combined to define the state and legacy of a queued transaction - - """ - QUEUED = 0x01 # transaction should be sent to network - RESERVED = 0x02 # transaction is currently being handled by a thread - IN_NETWORK = 0x08 # transaction is in network - - DEFERRED = 0x10 # an attempt to send the transaction to network has failed - GAS_ISSUES = 0x20 # transaction is pending sender account gas funding - - LOCAL_ERROR = 0x100 # errors that originate internally from the component - NODE_ERROR = 0x200 # errors originating in the node (invalid RLP input...) - NETWORK_ERROR = 0x400 # errors that originate from the network (REVERT) - UNKNOWN_ERROR = 0x800 # unclassified errors (the should not occur) - - FINAL = 0x1000 # transaction processing has completed - OBSOLETE = 0x2000 # transaction has been replaced by a different transaction with higher fee - MANUAL = 0x8000 # transaction processing has been manually overridden - - -@enum.unique -class StatusEnum(enum.IntEnum): - """ - - - Inactive, not finalized. (<0) - * PENDING: The initial state of a newly added transaction record. No action has been performed on this transaction yet. - * SENDFAIL: The transaction was not received by the node. - * RETRY: The transaction is queued for a new send attempt after previously failing. - * READYSEND: The transaction is queued for its first send attempt - * OBSOLETED: A new transaction with the same nonce and higher gas has been sent to network. - * WAITFORGAS: The transaction is on hold pending gas funding. - - Active state: (==0) - * SENT: The transaction has been sent to the mempool. - - Inactive, finalized. (>0) - * FUBAR: Unknown error occurred and transaction is abandoned. Manual intervention needed. - * CANCELLED: The transaction was sent, but was not mined and has disappered from the mempool. This usually follows a transaction being obsoleted. - * OVERRIDDEN: Transaction has been manually overriden. - * REJECTED: The transaction was rejected by the node. - * REVERTED: The transaction was mined, but exception occurred during EVM execution. (Block number will be set) - * SUCCESS: THe transaction was successfully mined. (Block number will be set) - - """ - PENDING = 0 - - SENDFAIL = StatusBits.DEFERRED | StatusBits.LOCAL_ERROR - RETRY = StatusBits.QUEUED | StatusBits.DEFERRED - READYSEND = StatusBits.QUEUED - RESERVED = StatusBits.RESERVED - - OBSOLETED = StatusBits.OBSOLETE | StatusBits.IN_NETWORK - - WAITFORGAS = StatusBits.GAS_ISSUES - - SENT = StatusBits.IN_NETWORK - FUBAR = StatusBits.FINAL | StatusBits.UNKNOWN_ERROR - CANCELLED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.OBSOLETE - OVERRIDDEN = StatusBits.FINAL | StatusBits.OBSOLETE | StatusBits.MANUAL - - REJECTED = StatusBits.NODE_ERROR | StatusBits.FINAL - REVERTED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.NETWORK_ERROR - SUCCESS = StatusBits.IN_NETWORK | StatusBits.FINAL - - -def status_str(v, bits_only=False): - """Render a human-readable string describing the status - - If the bit field exactly matches a StatusEnum value, the StatusEnum label will be returned. - - If a StatusEnum cannot be matched, the string will be postfixed with "*", unless explicitly instructed to return bit field labels only. - - :param v: Status bit field - :type v: number - :param bits_only: Only render individual bit labels. - :type bits_only: bool - :returns: Status string - :rtype: str - """ - s = '' - if not bits_only: - try: - s = StatusEnum(v).name - return s - except ValueError: - pass - - if v == 0: - return 'NONE' - - for i in range(16): - b = (1 << i) - if (b & 0xffff) & v: - n = StatusBits(b).name - if len(s) > 0: - s += ',' - s += n - if not bits_only: - s += '*' - return s - - - -def all_errors(): - """Bit mask of all error states - - :returns: Error flags - :rtype: number - """ - return StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR - - -def is_error_status(v): - """Check if value is an error state - - :param v: Status bit field - :type v: number - :returns: True if error - :rtype: bool - """ - return bool(v & all_errors()) - - -__ignore_manual_value = ~StatusBits.MANUAL -def ignore_manual(v): - return v & __ignore_manual_value - - -def is_nascent(v): - return ignore_manual(v) == StatusEnum.PENDING - - -def dead(): - """Bit mask defining whether a transaction is still likely to be processed on the network. - - :returns: Bit mask - :rtype: number - """ - return StatusBits.FINAL | StatusBits.OBSOLETE - - -def is_alive(v): - """Check if transaction is still likely to be processed on the network. - - The contingency of "likely" refers to the case a transaction has been obsoleted after sent to the network, but the network still confirms the obsoleted transaction. The return value of this method will not change as a result of this, BUT the state itself will (as the FINAL bit will be set). - - :returns: - """ - return bool(v & dead() == 0) +from chainqueue.enum import * diff --git a/chainqueue/enum.py b/chainqueue/enum.py @@ -0,0 +1,156 @@ +# standard imports +import enum + + +@enum.unique +class StatusBits(enum.IntEnum): + """Individual bit flags that are combined to define the state and legacy of a queued transaction + + """ + QUEUED = 0x01 # transaction should be sent to network + RESERVED = 0x02 # transaction is currently being handled by a thread + IN_NETWORK = 0x08 # transaction is in network + + DEFERRED = 0x10 # an attempt to send the transaction to network has failed + GAS_ISSUES = 0x20 # transaction is pending sender account gas funding + + LOCAL_ERROR = 0x100 # errors that originate internally from the component + NODE_ERROR = 0x200 # errors originating in the node (invalid RLP input...) + NETWORK_ERROR = 0x400 # errors that originate from the network (REVERT) + UNKNOWN_ERROR = 0x800 # unclassified errors (the should not occur) + + FINAL = 0x1000 # transaction processing has completed + OBSOLETE = 0x2000 # transaction has been replaced by a different transaction with higher fee + MANUAL = 0x8000 # transaction processing has been manually overridden + + +@enum.unique +class StatusEnum(enum.IntEnum): + """ + + - Inactive, not finalized. (<0) + * PENDING: The initial state of a newly added transaction record. No action has been performed on this transaction yet. + * SENDFAIL: The transaction was not received by the node. + * RETRY: The transaction is queued for a new send attempt after previously failing. + * READYSEND: The transaction is queued for its first send attempt + * OBSOLETED: A new transaction with the same nonce and higher gas has been sent to network. + * WAITFORGAS: The transaction is on hold pending gas funding. + - Active state: (==0) + * SENT: The transaction has been sent to the mempool. + - Inactive, finalized. (>0) + * FUBAR: Unknown error occurred and transaction is abandoned. Manual intervention needed. + * CANCELLED: The transaction was sent, but was not mined and has disappered from the mempool. This usually follows a transaction being obsoleted. + * OVERRIDDEN: Transaction has been manually overriden. + * REJECTED: The transaction was rejected by the node. + * REVERTED: The transaction was mined, but exception occurred during EVM execution. (Block number will be set) + * SUCCESS: THe transaction was successfully mined. (Block number will be set) + + """ + PENDING = 0 + + SENDFAIL = StatusBits.DEFERRED | StatusBits.LOCAL_ERROR + RETRY = StatusBits.QUEUED | StatusBits.DEFERRED + READYSEND = StatusBits.QUEUED + RESERVED = StatusBits.RESERVED + + OBSOLETED = StatusBits.OBSOLETE | StatusBits.IN_NETWORK + + WAITFORGAS = StatusBits.GAS_ISSUES + + SENT = StatusBits.IN_NETWORK + FUBAR = StatusBits.FINAL | StatusBits.UNKNOWN_ERROR + CANCELLED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.OBSOLETE + OVERRIDDEN = StatusBits.FINAL | StatusBits.OBSOLETE | StatusBits.MANUAL + + REJECTED = StatusBits.NODE_ERROR | StatusBits.FINAL + REVERTED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.NETWORK_ERROR + SUCCESS = StatusBits.IN_NETWORK | StatusBits.FINAL + + +def status_str(v, bits_only=False): + """Render a human-readable string describing the status + + If the bit field exactly matches a StatusEnum value, the StatusEnum label will be returned. + + If a StatusEnum cannot be matched, the string will be postfixed with "*", unless explicitly instructed to return bit field labels only. + + :param v: Status bit field + :type v: number + :param bits_only: Only render individual bit labels. + :type bits_only: bool + :returns: Status string + :rtype: str + """ + s = '' + if not bits_only: + try: + s = StatusEnum(v).name + return s + except ValueError: + pass + + if v == 0: + return 'NONE' + + for i in range(16): + b = (1 << i) + if (b & 0xffff) & v: + n = StatusBits(b).name + if len(s) > 0: + s += ',' + s += n + if not bits_only: + s += '*' + return s + + +def status_bytes(status=0): + return status.to_bytes(8, byteorder='big') + + +def all_errors(): + """Bit mask of all error states + + :returns: Error flags + :rtype: number + """ + return StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR + + +def is_error_status(v): + """Check if value is an error state + + :param v: Status bit field + :type v: number + :returns: True if error + :rtype: bool + """ + return bool(v & all_errors()) + + +__ignore_manual_value = ~StatusBits.MANUAL +def ignore_manual(v): + return v & __ignore_manual_value + + +def is_nascent(v): + return ignore_manual(v) == StatusEnum.PENDING + + +def dead(): + """Bit mask defining whether a transaction is still likely to be processed on the network. + + :returns: Bit mask + :rtype: number + """ + return StatusBits.FINAL | StatusBits.OBSOLETE + + +def is_alive(v): + """Check if transaction is still likely to be processed on the network. + + The contingency of "likely" refers to the case a transaction has been obsoleted after sent to the network, but the network still confirms the obsoleted transaction. The return value of this method will not change as a result of this, BUT the state itself will (as the FINAL bit will be set). + + :returns: + """ + return bool(v & dead() == 0) diff --git a/chainqueue/error.py b/chainqueue/error.py @@ -21,3 +21,10 @@ class CacheIntegrityError(ChainQueueException): """ pass + +class BackendIntegrityError(ChainQueueException): + """Raised when queue backend has invalid state + """ + pass + + diff --git a/chainqueue/fs/cache.py b/chainqueue/fs/cache.py @@ -0,0 +1,89 @@ +# standard imports +import stat +import logging +import os + +# local imports +from chainqueue.enum import ( + StatusBits, + status_bytes, + ) + +logg = logging.getLogger(__name__) + + +class FsQueueBackend: + + + def add(self, label, content, prefix): + raise NotImplementedError() + + + def get_index(self, idx): + raise NotImplementedError() + + + def set_prefix(self, idx, prefix): + raise NotImplementedError() + + +class FsQueue: + + def __init__(self, root_path, backend=FsQueueBackend()): + self.backend = backend + self.path = root_path + self.path_state = {} + + try: + fi = os.stat(self.path) + self.__verify_directory() + except FileNotFoundError: + FsQueue.__prepare_directory(self.path) + + for r in FsQueue.__state_dirs(self.path): + self.path_state[r[0]] = r[1] + + self.index_path = os.path.join(self.path, '.active') + os.makedirs(self.index_path, exist_ok=True) + + + def add(self, key, value): + prefix = status_bytes() + c = self.backend.add(key, value, prefix=prefix) + + key_hex = key.hex() + entry_path = os.path.join(self.index_path, key_hex) + f = open(entry_path, 'xb') + f.write(c.to_bytes(8, byteorder='big')) + f.close() + + ptr_path = os.path.join(self.path_state['new'], key_hex) + os.link(entry_path, ptr_path) + + logg.debug('added new queue entry {} -> {} index {}'.format(ptr_path, entry_path, c)) + + + @staticmethod + def __state_dirs(path): + r = [] + for s in [ + 'new', + 'reserved', + 'ready', + 'error', + 'defer', + ]: + r.append((s, os.path.join(path, 'spool', s))) + return r + + + def __verify_directory(self): + return True + + + @staticmethod + def __prepare_directory(path): + os.makedirs(path, exist_ok=True) + os.makedirs(os.path.join(path, '.cache')) + for r in FsQueue.__state_dirs(path): + os.makedirs(r[1]) diff --git a/chainqueue/fs/dir.py b/chainqueue/fs/dir.py @@ -15,6 +15,7 @@ class HexDir: self.path = root_path self.key_length = key_length self.prefix_length = prefix_length + self.entry_length = key_length + prefix_length self.__levels = levels + 2 fi = None try: @@ -44,6 +45,8 @@ class HexDir: key_hex = key.hex() entry_path = self.to_filepath(key_hex) + c = self.count() + os.makedirs(os.path.dirname(entry_path), exist_ok=True) f = open(entry_path, 'wb') f.write(content) @@ -55,7 +58,18 @@ class HexDir: f.write(key) f.close() - logg.info('created new entry {} in {}'.format(key_hex, entry_path)) + logg.info('created new entry {} idx {} in {}'.format(key_hex, c, entry_path)) + + return c + + + def count(self): + fi = os.stat(self.master_file) + c = fi.st_size / self.entry_length + r = int(c) + if r != c: # TODO: verify valid for check if evenly divided + raise IndexError('master file not aligned') + return r def set_prefix(self, idx, prefix): diff --git a/tests/test_fs.py b/tests/test_fs.py @@ -0,0 +1,42 @@ +# standard imports +import unittest +import tempfile +import shutil +import logging +import os + +# local imports +from chainqueue.fs.cache import FsQueue +from chainqueue.fs.dir import HexDir + +logging.basicConfig(level=logging.DEBUG) +logg = logging.getLogger() + + +class HexDirTest(unittest.TestCase): + + def setUp(self): + self.dir = tempfile.mkdtemp() + self.hexdir = HexDir(os.path.join(self.dir, 'q'), 32, 2, 8) + self.q = FsQueue(os.path.join(self.dir, 'spool'), backend=self.hexdir) + logg.debug('setup fsqueue root {}'.format(self.dir)) + + + def tearDown(self): + shutil.rmtree(self.dir) + logg.debug('cleaned fsqueue root {}'.format(self.dir)) + + + def test_new(self): + tx_hash = os.urandom(32) + tx_content = os.urandom(128) + self.q.add(tx_hash, tx_content) + + f = open(os.path.join(self.q.path_state['new'], tx_hash.hex()), 'rb') + r = f.read() + f.close() + self.assertEqual(r, b'\x00' * 8) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_hexdir.py b/tests/test_hexdir.py @@ -51,6 +51,13 @@ class HexDirTest(unittest.TestCase): self.hexdir.add(label, content, prefix=b'a') + def test_index(self): + self.hexdir.add(b'\xde\xad\xbe\xef', b'foo', b'ab') + self.hexdir.add(b'\xbe\xef\xfe\xed', b'bar', b'cd') + c = self.hexdir.add(b'\x01\x02\x03\x04', b'baz', b'ef') + self.assertEqual(c, 2) + + def test_edit(self): self.hexdir.add(b'\xde\xad\xbe\xef', b'foo', b'ab') self.hexdir.add(b'\xbe\xef\xfe\xed', b'bar', b'cd')