info.py (5328B)
1 # SPDX-License-Identifier: GPL-3.0-or-later 2 3 # standard imports 4 import datetime 5 import sys 6 import os 7 import json 8 import argparse 9 import logging 10 11 # external imports 12 from chainlib.chain import ChainSpec 13 from hexathon import ( 14 add_0x, 15 strip_0x, 16 even, 17 ) 18 import sha3 19 from funga.eth.signer import EIP155Signer 20 from chainlib.settings import ChainSettings 21 22 # local imports 23 from chainlib.eth.address import AddressChecksum 24 from chainlib.eth.chain import network_id 25 from chainlib.eth.block import ( 26 block_latest, 27 block_by_number, 28 syncing, 29 Block, 30 ) 31 from chainlib.eth.tx import count 32 from chainlib.eth.connection import EthHTTPConnection 33 from chainlib.eth.gas import ( 34 OverrideGasOracle, 35 balance, 36 price, 37 ) 38 import chainlib.eth.cli 39 from chainlib.eth.cli.arg import ( 40 Arg, 41 ArgFlag, 42 process_args, 43 ) 44 from chainlib.eth.cli.config import ( 45 Config, 46 process_config, 47 ) 48 from chainlib.eth.cli.log import process_log 49 from chainlib.eth.settings import process_settings 50 51 BLOCK_SAMPLES = 10 52 53 logg = logging.getLogger() 54 55 script_dir = os.path.dirname(os.path.realpath(__file__)) 56 config_dir = os.path.join(script_dir, '..', 'data', 'config') 57 58 results_translation = { 59 'network_id': 'Network Id', 60 'block': 'Block', 61 'syncing': 'Syncing', 62 'gas_limit': 'Gas Limit', 63 'gas_price': 'Gas Price', 64 'block_time': 'Block time', 65 } 66 67 68 def process_config_local(config, arg, args, flags): 69 config.add(args.local, '_LOCAL', False) 70 config.add(args.long, '_LONG', False) 71 config.add(None, '_ENTRY', False) 72 if len(args.entry) > 0: 73 config.add(args.entry[0], '_ENTRY', True) 74 if config.get('_ENTRY') not in results_translation.keys(): 75 raise ValueError('Unknown entry {}'.format(config.get('_ENTRY'))) 76 return config 77 78 79 arg_flags = ArgFlag() 80 arg = Arg(arg_flags) 81 flags = arg_flags.STD_READ_NOEX | arg_flags.ENV | arg_flags.TAB 82 83 argparser = chainlib.eth.cli.ArgumentParser() 84 argparser = process_args(argparser, arg, flags) 85 argparser.add_argument('--long', action='store_true', help='Calculate averages through sampling of blocks and txs') 86 argparser.add_argument('--local', action='store_true', help='Include local info') 87 argparser.add_argument('entry', nargs='?', help='Output single item') 88 args = argparser.parse_args() 89 90 logg = process_log(args, logg) 91 92 config = Config() 93 config = process_config(config, arg, args, flags) 94 config = process_config_local(config, arg, args, flags) 95 logg.debug('config loaded:\n{}'.format(config)) 96 97 settings = ChainSettings() 98 settings = process_settings(settings, config) 99 logg.debug('settings loaded:\n{}'.format(settings)) 100 101 102 def set_result(results, k, v, w=sys.stdout): 103 kt = results_translation[k] 104 if str(config.get('_ENTRY')) == k: 105 w.write('{}'.format(v)) 106 return True 107 logg.info('{}: {}\n'.format(kt, v)) 108 results[k] = v 109 return False 110 111 112 def main(): 113 human = not config.true('_RAW') 114 results = {} 115 116 o = network_id(id_generator=settings.get('RPC_ID_GENERATOR')) 117 r = settings.get('CONN').do(o) 118 if set_result(results, 'network_id', r): 119 return 120 121 o = block_latest(id_generator=settings.get('RPC_ID_GENERATOR')) 122 r = settings.get('CONN').do(o) 123 try: 124 n = int(r, 16) 125 except ValueError: 126 n = int(r) 127 first_block_number = n 128 if human: 129 n = format(n, ',') 130 if set_result(results, 'block', n): 131 return 132 133 o = block_by_number(first_block_number, False, id_generator=settings.get('RPC_ID_GENERATOR')) 134 r = settings.get('CONN').do(o) 135 last_block = Block(r, dialect_filter=settings.get('RPC_DIALECT_FILTER')) 136 last_timestamp = last_block.timestamp 137 138 if config.true('_LONG'): 139 aggr_time = 0.0 140 aggr_gas = 0 141 for i in range(BLOCK_SAMPLES): 142 o = block_by_number(first_block_number-i, False, id_generator=settings.get('RPC_ID_GENERATOR')) 143 r = settings.get('CONN').do(o) 144 block = Block(r, dialect_filter=settings.get('RPC_DIALECT_FILTER')) 145 aggr_time += last_block.timestamp - block.timestamp 146 147 gas_limit = int(r['gasLimit'], 16) 148 aggr_gas += gas_limit 149 150 last_block = block 151 last_timestamp = block.timestamp 152 153 n = int(aggr_gas / BLOCK_SAMPLES) 154 if human: 155 n = format(n, ',') 156 157 if set_result(results, 'gas_limit', n): 158 return 159 if set_result(results, 'block_time', aggr_time / BLOCK_SAMPLES): 160 return 161 162 o = price(id_generator=settings.get('RPC_ID_GENERATOR')) 163 r = settings.get('CONN').do(o) 164 n = int(r, 16) 165 if human: 166 n = format(n, ',') 167 if set_result(results, 'gas_price', n): 168 return 169 170 if config.get('_LOCAL'): 171 o = syncing() 172 r = settings.get('CONN').do(o) 173 if set_result(results, 'syncing', r): 174 return 175 176 if config.get('_ENTRY') != None: 177 raise RuntimeError('entry {} ({}) not processed, please review the flag settings'.format(config.get('_ENTRY'), results_translation[config.get('_ENTRY')])) 178 179 for k in results.keys(): 180 kt = results_translation[k] 181 sys.stdout.write('{}: {}\n'.format(kt, results[k])) 182 183 184 if __name__ == '__main__': 185 main()