list.py (4460B)
1 # SPDX-License-Identifier: GPL-3.0-or-later 2 3 # standard imports 4 import os 5 import logging 6 import sys 7 import argparse 8 9 # external imports 10 from hexathon import add_0x 11 from chaind import Environment 12 import chainlib.cli 13 from chainlib.chain import ChainSpec 14 from chainqueue.db import dsn_from_config 15 from chainqueue.sql.backend import SQLBackend 16 from chaind.sql.session import SessionIndex 17 from chainqueue.adapters.sessionindex import SessionIndexAdapter 18 from chainqueue.cli import Outputter 19 from chainqueue.enum import ( 20 StatusBits, 21 all_errors, 22 ) 23 24 25 logging.basicConfig(level=logging.WARNING) 26 logg = logging.getLogger() 27 28 script_dir = os.path.dirname(os.path.realpath(__file__)) 29 config_dir = os.path.join(script_dir, '..', 'data', 'config') 30 31 arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC 32 argparser = chainlib.cli.ArgumentParser( 33 arg_flags, 34 description="""Lists and queue items by session id, filterable by timestamp and state. 35 36 By default all columns will be displayed. Columns can be explicitly selected instead by passing one or more column names using the -o,--column flag. Valid column names are: chainspec, hash, statustext, statuscode. 37 """) 38 argparser.add_argument('--backend', type=str, default='sql', help='Backend to use (currently only "sql")') 39 argparser.add_argument('--start', type=str, help='Oldest transaction hash to include in results') 40 argparser.add_argument('--end', type=str, help='Newest transaction hash to include in results') 41 argparser.add_argument('--error', action='store_true', help='Only show transactions which have error state') 42 argparser.add_argument('--pending', action='store_true', help='Omit finalized transactions') 43 argparser.add_argument('--status-mask', type=int, dest='status_mask', help='Manually specify status bitmask value to match (overrides --error and --pending)') 44 argparser.add_argument('--summary', action='store_true', help='output summary for each status category') 45 argparser.add_argument('--address', dest='address', type=str, help='filter by address') 46 argparser.add_argument('-o', '--column', dest='column', action='append', type=str, help='add a column to display') 47 argparser.add_positional('session_id', type=str, help='Ethereum address of recipient') 48 args = argparser.parse_args() 49 extra_args = { 50 'address': None, 51 'backend': None, 52 'start': None, 53 'end': None, 54 'error': None, 55 'pending': None, 56 'status_mask': None, 57 'summary': None, 58 'column': None, 59 'session_id': 'SESSION_ID', 60 } 61 62 env = Environment(domain='eth', env=os.environ) 63 64 config = chainlib.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir) 65 66 if config.get('SESSION_DATA_DIR') == None: 67 config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True) 68 69 chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) 70 71 status_mask = config.get('_STATUS_MASK', None) 72 not_status_mask = None 73 if status_mask == None: 74 if config.get('_ERROR'): 75 status_mask = all_errors() 76 if config.get('_PENDING'): 77 not_status_mask = StatusBits.FINAL 78 79 tx_getter = None 80 session_method = None 81 if config.get('_BACKEND') == 'sql': 82 from chainqueue.sql.query import get_tx_cache as tx_getter 83 from chainqueue.runnable.sql import setup_backend 84 from chainqueue.db.models.base import SessionBase 85 setup_backend(config, debug=config.true('DATABASE_DEBUG')) 86 session_method = SessionBase.create_session 87 else: 88 raise NotImplementedError('backend {} not implemented'.format(config.get('_BACKEND'))) 89 90 if config.get('DATABASE_ENGINE') == 'sqlite': 91 config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) 92 93 dsn = dsn_from_config(config) 94 backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG')) 95 session_index_backend = SessionIndex(config.get('SESSION_ID')) 96 adapter = SessionIndexAdapter(backend, session_index_backend=session_index_backend) 97 output_cols = config.get('_COLUMN') 98 99 def main(): 100 outputter = Outputter(chain_spec, sys.stdout, tx_getter, session_method=session_method, decode_status=config.true('_RAW'), cols=output_cols) 101 txs = session_index_backend.get(chain_spec, adapter) 102 if config.get('_SUMMARY'): 103 for k in txs.keys(): 104 outputter.add(k) 105 outputter.decode_summary() 106 else: 107 for k in txs.keys(): 108 outputter.decode_single(k) 109 110 111 if __name__ == '__main__': 112 main()