queuer.py (4756B)
1 # standard imports 2 import os 3 import logging 4 import signal 5 6 # external imports 7 import chainlib.eth.cli 8 from chainlib.eth.cli.arg import ( 9 Arg, 10 ArgFlag, 11 process_args, 12 ) 13 from chainlib.eth.cli.config import ( 14 Config, 15 process_config, 16 ) 17 from chainqueue.cli.arg import ( 18 apply_arg as apply_arg_queue, 19 apply_flag as apply_flag_queue, 20 ) 21 from chaind.cli.arg import ( 22 apply_arg, 23 apply_flag, 24 ) 25 from chaind.session import SessionController 26 from chaind.setup import Environment 27 from chaind.error import ( 28 NothingToDoError, 29 ClientGoneError, 30 ClientBlockError, 31 ClientInputError, 32 ) 33 from chainqueue import ( 34 Store, 35 Status, 36 ) 37 from chainqueue.error import DuplicateTxError 38 from chainqueue.store.fs import ( 39 IndexStore, 40 CounterStore, 41 ) 42 from chainqueue.cache import CacheTokenTx 43 from chainlib.encode import TxHexNormalizer 44 from chainlib.chain import ChainSpec 45 from chaind.adapters.fs import ChaindFsAdapter 46 from chaind.dispatch import DispatchProcessor 47 from chainqueue.data import config_dir as chainqueue_config_dir 48 from chaind.data import config_dir as chaind_config_dir 49 from chainlib.eth.cli.log import process_log 50 from chaind.cli.config import process_config as process_config_local 51 52 # local imports 53 from chaind.eth.cache import EthCacheTx 54 from chaind.eth.settings import ChaindSettings 55 from chaind.eth.dispatch import EthDispatcher 56 from chaind.eth.settings import process_settings 57 from chaind.settings import ( 58 process_queue, 59 process_socket, 60 process_dispatch, 61 ) 62 63 logging.basicConfig(level=logging.WARNING) 64 logg = logging.getLogger() 65 66 script_dir = os.path.dirname(os.path.realpath(__file__)) 67 config_dir = os.path.join(script_dir, '..', 'data', 'config') 68 69 env = Environment(domain='eth', env=os.environ) 70 71 arg_flags = ArgFlag() 72 arg_flags = apply_flag_queue(arg_flags) 73 arg_flags = apply_flag(arg_flags) 74 75 arg = Arg(arg_flags) 76 arg = apply_arg_queue(arg) 77 arg = apply_arg(arg) 78 79 flags = arg_flags.STD_READ | arg_flags.QUEUE | arg_flags.STATE | arg_flags.SESSION 80 81 argparser = chainlib.eth.cli.ArgumentParser() 82 argparser = process_args(argparser, arg, flags) 83 args = argparser.parse_args() 84 85 logg = process_log(args, logg) 86 87 config = Config() 88 config.add_schema_dir(chainqueue_config_dir) 89 config.add_schema_dir(chaind_config_dir) 90 config = process_config(config, arg, args, flags) 91 config = process_config_local(config, arg, args, flags) 92 config.add('eth', 'CHAIND_ENGINE', False) 93 config.add('sync', 'CHAIND_COMPONENT', False) 94 logg.debug('config loaded:\n{}'.format(config)) 95 96 settings = ChaindSettings(include_sync=True) 97 settings = process_settings(settings, config) 98 settings = process_queue(settings, config) 99 settings = process_socket(settings, config) 100 settings = process_dispatch(settings, config) 101 logg.debug('settings loaded:\n{}'.format(settings)) 102 103 tx_normalizer = TxHexNormalizer().tx_hash 104 token_cache_store = CacheTokenTx(settings.get('CHAIN_SPEC'), normalizer=tx_normalizer) 105 106 dispatcher = EthDispatcher(settings.get('CONN')) 107 processor = DispatchProcessor(settings.get('CHAIN_SPEC'), settings.dir_for('queue'), dispatcher) 108 ctrl = SessionController(settings, processor.process) 109 110 signal.signal(signal.SIGINT, ctrl.shutdown) 111 signal.signal(signal.SIGTERM, ctrl.shutdown) 112 113 logg.info('session id is ' + settings.get('SESSION_ID')) 114 logg.info('session socket path is ' + settings.get('SESSION_SOCKET_PATH')) 115 116 117 def main(): 118 global dispatcher, settings 119 120 queue_adapter = ChaindFsAdapter( 121 settings.get('CHAIN_SPEC'), 122 settings.dir_for('queue'), 123 EthCacheTx, 124 dispatcher, 125 store_sync=False, 126 ) 127 128 while True: 129 v = None 130 client_socket = None 131 try: 132 (client_socket, v) = ctrl.get() 133 except ClientGoneError: 134 break 135 except ClientBlockError: 136 continue 137 except ClientInputError: 138 continue 139 except NothingToDoError: 140 pass 141 142 if v == None: 143 ctrl.process(settings.get('CONN')) 144 #queue_adapter = create_adapter(settings, dispatcher) 145 continue 146 147 result_data = None 148 r = 0 # no error 149 try: 150 result_data = queue_adapter.put(v.hex()) 151 except DuplicateTxError as e: 152 logg.error('tx already exists: {}'.format(e)) 153 r = 1 154 except ValueError as e: 155 logg.error('adapter rejected input {}: "{}"'.format(v.hex(), e)) 156 continue 157 158 if r == 0: 159 queue_adapter.enqueue(result_data) 160 161 ctrl.respond_put(client_socket, r, extra_data=result_data) 162 163 164 if __name__ == '__main__': 165 main()