chainqueue

Blockchain transaction queue control
Log | Files | Refs | LICENSE

commit c45e6e3310f812f483ac7c64d70593998098f374
parent 97fbec647734ff2a4a74f78380da15e1ef82c400
Author: lash <dev@holbrook.no>
Date:   Fri, 13 May 2022 09:29:54 +0000

Implement settings on chainlib 0.3.0 structure

Diffstat:
Mchainqueue/cli/arg.py | 2++
Mchainqueue/runnable/list.py | 12------------
Mchainqueue/settings.py | 166++++++++++++++++++++++++++++++++++++++++++++-----------------------------------
Mrequirements.txt | 4++--
Msetup.cfg | 2+-
5 files changed, 97 insertions(+), 89 deletions(-)

diff --git a/chainqueue/cli/arg.py b/chainqueue/cli/arg.py @@ -1,2 +1,4 @@ def process_flags(argparser, flags): argparser.add_argument('--backend', type=str, help='Backend to use for state store') + argparser.add_argument('--tx-digest-size', dest='tx_digest_size', type=str, help='Size of transaction hash in bytes') + argparser.add_argument('--state-dir', dest='state_dir', type=str, help='Directory to store queuer state in') diff --git a/chainqueue/runnable/list.py b/chainqueue/runnable/list.py @@ -28,19 +28,12 @@ config_dir = os.path.join(script_dir, '..', 'data', 'config') arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC | chainlib.cli.Flag.UNSAFE argparser = chainlib.cli.ArgumentParser(arg_flags) argparser.add_argument('--backend', type=str, default='sql', help='Backend to use') -argparser.add_argument('--state-dir', type=str, dest='state_dir', help='Backend to use') -argparser.add_argument('--tx-digest-size', type=int, dest='tx_digest_size', default=32, help='Size of tx digest in bytes') -#argparser.add_argument('--session-id', type=str, dest='session_id', help='Session id to list') -#argparser.add_argument('--start', type=str, help='Oldest transaction to include in results') -#argparser.add_argument('--end', type=str, help='Newest transaction to include in results') argparser.add_argument('--error', action='store_true', help='Only show transactions which have error state') argparser.add_argument('--no-final', action='store_true', dest='no_final', help='Omit finalized transactions') argparser.add_argument('--status-mask', type=str, dest='status_mask', action='append', default=[], help='Manually specify status bitmask value to match (overrides --error and --pending)') argparser.add_argument('--exact', action='store_true', help='Match status exact') argparser.add_argument('--include-pending', action='store_true', dest='include_pending', help='Include transactions in unprocessed state (pending)') argparser.add_argument('--renderer', type=str, default=[], action='append', help='Transaction renderer for output') -#argparser.add_argument('--summary', action='store_true', help='output summary for each status category') -#argparser.add_argument('-o', '--column', dest='column', action='append', type=str, help='add a column to display') argparser.add_positional('address', required=False, type=str, help='Ethereum address of recipient') args = argparser.parse_args() extra_args = { @@ -48,16 +41,11 @@ extra_args = { 'backend': None, 'state_dir': None, 'exact': None, -# 'tx_digest_size': None, -# 'start': None, -# 'end': None, 'error': None, 'include_pending': '_PENDING', 'status_mask': None, 'no_final': None, 'renderer': None, -# 'column': None, -# 'summary': None, } config = chainlib.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir) config = chainqueue.cli.config.process_config(config, args, 0) diff --git a/chainqueue/settings.py b/chainqueue/settings.py @@ -10,77 +10,95 @@ from chainqueue.store import Store logg = logging.getLogger(__name__) -class ChainqueueSettings(ChainSettings): - - def process_queue_tx(self, config): - self.o['TX_DIGEST_SIZE'] = config.get('TX_DIGEST_SIZE') - - - def process_queue_backend(self, config): - self.o['QUEUE_BACKEND'] = config.get('QUEUE_BACKEND') - self.o['QUEUE_STATE_STORE'] = Status(self.o['QUEUE_STORE_FACTORY'], allow_invalid=True) - self.o['QUEUE_STORE'] = Store( - self.o['CHAIN_SPEC'], - self.o['QUEUE_STATE_STORE'], - self.o['QUEUE_INDEX_STORE'], - self.o['QUEUE_COUNTER_STORE'], - sync=True, - ) - - - def process_queue_paths(self, config): - index_dir = config.get('QUEUE_INDEX_PATH') - if index_dir == None: - index_dir = os.path.join(config.get('QUEUE_STATE_PATH'), 'tx') - - counter_dir = config.get('QUEUE_COUNTER_PATH') - if counter_dir == None: - counter_dir = os.path.join(config.get('QUEUE_STATE_PATH')) - - self.o['QUEUE_STATE_PATH'] = config.get('QUEUE_STATE_PATH') - self.o['QUEUE_INDEX_PATH'] = index_dir - self.o['QUEUE_COUNTER_PATH'] = counter_dir - - - def process_queue_backend_fs(self, config): - from chainqueue.store.fs import IndexStore - from chainqueue.store.fs import CounterStore - from shep.store.file import SimpleFileStoreFactory - index_store = IndexStore(self.o['QUEUE_INDEX_PATH'], digest_bytes=self.o['TX_DIGEST_SIZE']) - counter_store = CounterStore(self.o['QUEUE_COUNTER_PATH']) - factory = SimpleFileStoreFactory(self.o['QUEUE_STATE_PATH'], use_lock=True).add - - self.o['QUEUE_INDEX_STORE'] = index_store - self.o['QUEUE_COUNTER_STORE'] = counter_store - self.o['QUEUE_STORE_FACTORY'] = factory - - - def process_queue_status_filter(self, config): - states = 0 - if len(config.get('_STATUS_MASK')) == 0: - for v in self.o['QUEUE_STATE_STORE'].all(numeric=True): - states |= v - logg.debug('state store {}'.format(states)) - else: - for v in config.get('_STATUS_MASK'): - try: - states |= int(v) - continue - except ValueError: - pass - - state = self.o['QUEUE_STATE_STORE'].from_name(v) - logg.debug('resolved state argument {} to numeric state {}'.format(v, state)) - states |= state - - self.o['QUEUE_STATUS_FILTER'] = states - - - def process(self, config): - super(ChainqueueSettings, self).process(config) - self.process_queue_tx(config) - self.process_queue_paths(config) - if config.get('_BACKEND') == 'fs': - self.process_queue_backend_fs(config) - self.process_queue_backend(config) - self.process_queue_status_filter(config) +def process_queue_tx(settings, config): + settings.set('TX_DIGEST_SIZE', config.get('TX_DIGEST_SIZE')) + return settings + + +def process_queue_backend(settings, config): + settings.set('QUEUE_BACKEND', config.get('QUEUE_BACKEND')) + return settings + + +def process_queue_store(settings, config): + status = Status(settings.o['QUEUE_STORE_FACTORY'], allow_invalid=True) + settings.set('QUEUE_STATE_STORE'], status) + store = Store( + settings.o['CHAIN_SPEC'], + settings.o['QUEUE_STATE_STORE'], + settings.o['QUEUE_INDEX_STORE'], + settings.o['QUEUE_COUNTER_STORE'], + sync=True, + ) + settings.set('QUEUE_STORE', store) + return settings + + +def process_queue_paths(settings, config): + index_dir = config.get('QUEUE_INDEX_PATH') + if index_dir == None: + index_dir = os.path.join(config.get('QUEUE_STATE_PATH'), 'tx') + + counter_dir = config.get('QUEUE_COUNTER_PATH') + if counter_dir == None: + counter_dir = os.path.join(config.get('QUEUE_STATE_PATH')) + + settings.set('QUEUE_STATE_PATH', config.get('QUEUE_STATE_PATH')) + settings.set('QUEUE_INDEX_PATH', index_dir) + settings.set('QUEUE_COUNTER_PATH'], counter_dir) + return settings + + +def process_queue_backend_fs(settings, config): + from chainqueue.store.fs import IndexStore + from chainqueue.store.fs import CounterStore + from shep.store.file import SimpleFileStoreFactory + index_store = IndexStore(settings.o['QUEUE_INDEX_PATH'], digest_bytes=int(settings.o['TX_DIGEST_SIZE'])) + counter_store = CounterStore(settings.o['QUEUE_COUNTER_PATH']) + factory = SimpleFileStoreFactory(settings.o['QUEUE_STATE_PATH'], use_lock=True).add + + settings.set('QUEUE_INDEX_STORE', index_store) + settings.set('QUEUE_COUNTER_STORE', counter_store) + settings.set('QUEUE_STORE_FACTORY', factory) + + return settings + + +def process_queue_status_filter(settings, config): + states = 0 + store = settings.get('QUEUE_STATE_STORE') + if len(config.get('_STATUS_MASK')) == 0: + for v in store.all(numeric=True): + states |= v + logg.debug('state store {}'.format(states)) + else: + for v in config.get('_STATUS_MASK'): + try: + states |= int(v) + continue + except ValueError: + pass + + state = store.from_name(v) + logg.debug('resolved state argument {} to numeric state {}'.format(v, state)) + states |= state + + settings.set('QUEUE_STATUS_FILTER', states) + return settings + + +def process_queue(settings, config): + settings = process_queue_tx(settings, config) + settings = process_queue_paths(settings, config) + if config.get('QUEUE_BACKEND') == 'fs': + settings = process_queue_backend_fs(settings, config) + settings = process_queue_backend(settings, config) + settings = process_queue_store(settings, config) + settings = process_queue_status_filter(settings, config) + return settings + + +def process_settings(settings, config): + super(ChainqueueSettings, settings).process(config) + settings = settings.process_queue(settings, config) + return settings diff --git a/requirements.txt b/requirements.txt @@ -1,5 +1,5 @@ hexathon~=0.1.7 leveldir~=0.3.0 -confini~=0.6.0 -chainlib~=0.2.0 +confini~=0.6.1 +chainlib~=0.3.0 shep~=0.2.9 diff --git a/setup.cfg b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainqueue -version = 0.1.16 +version = 0.1.17 description = Generic blockchain transaction queue control author = Louis Holbrook author_email = dev@holbrook.no