send.py (5929B)
1 # standard imports 2 import os 3 import logging 4 import sys 5 import datetime 6 import enum 7 import re 8 import stat 9 import socket 10 11 # external imports 12 import chainlib.eth.cli 13 from chainlib.eth.cli.arg import ( 14 Arg, 15 ArgFlag, 16 process_args, 17 ) 18 from chainlib.eth.cli.config import ( 19 Config, 20 process_config, 21 ) 22 from chaind.setup import Environment 23 from chainlib.eth.gas import price 24 from chainlib.chain import ChainSpec 25 from hexathon import strip_0x 26 from chainqueue.cli.arg import ( 27 apply_arg as apply_arg_queue, 28 apply_flag as apply_flag_queue, 29 ) 30 from chainqueue.data import config_dir as chainqueue_config_dir 31 from chaind.data import config_dir as chaind_config_dir 32 from chaind.cli.arg import ( 33 apply_arg, 34 apply_flag, 35 ) 36 from chainlib.eth.cli.log import process_log 37 from chaind.settings import process_queue 38 from chaind.settings import ChaindSettings 39 from chaind.error import TxSourceError 40 from chainlib.error import ( 41 InitializationError, 42 SignerMissingException, 43 ) 44 from chaind.cli.config import process_config as process_config_local 45 46 # local imports 47 from chaind.eth.token.process import Processor 48 from chaind.eth.token.gas import GasTokenResolver 49 from chaind.eth.cli.csv import CSVProcessor 50 from chaind.eth.cli.output import ( 51 Outputter, 52 OpMode, 53 ) 54 from chaind.eth.settings import process_settings 55 56 logg = logging.getLogger() 57 58 59 def process_settings_local(settings, config): 60 # if settings.get('SIGNER') == None: 61 # raise SignerMissingException('signer missing') 62 return settings 63 64 65 env = Environment(domain='eth', env=os.environ) 66 67 arg_flags = ArgFlag() 68 arg_flags = apply_flag_queue(arg_flags) 69 arg_flags = apply_flag(arg_flags) 70 71 arg = Arg(arg_flags) 72 arg = apply_arg_queue(arg) 73 arg = apply_arg(arg) 74 arg.set_long('s', 'send-rpc') 75 76 flags = arg_flags.STD_WRITE | arg_flags.TOKEN | arg_flags.SOCKET_CLIENT | arg_flags.STATE | arg_flags.WALLET | arg_flags.SESSION 77 78 argparser = chainlib.eth.cli.ArgumentParser() 79 argparser = process_args(argparser, arg, flags) 80 argparser.add_argument('source', help='Transaction source file') 81 args = argparser.parse_args() 82 83 logg = process_log(args, logg) 84 85 config = Config() 86 config.add_schema_dir(chainqueue_config_dir) 87 config.add_schema_dir(chaind_config_dir) 88 config = process_config(config, arg, args, flags) 89 config = process_config_local(config, arg, args, flags) 90 config.add(args.source, '_SOURCE', False) 91 config.add('queue', 'CHAIND_COMPONENT', False) 92 config.add('eth', 'CHAIND_ENGINE', False) 93 logg.debug('config loaded:\n{}'.format(config)) 94 95 try: 96 settings = ChaindSettings(include_sync=True) 97 settings = process_settings(settings, config) 98 settings = process_queue(settings, config) 99 settings = process_settings_local(settings, config) 100 except InitializationError as e: 101 sys.stderr.write('Initialization error: ' + str(e) + '\n') 102 sys.exit(1) 103 logg.debug('settings loaded:\n{}'.format(settings)) 104 105 mode = OpMode.STDOUT 106 107 re_unix = r'^ipc://(/.+)' 108 m = re.match(re_unix, config.get('SESSION_SOCKET_PATH', '')) 109 if m != None: 110 config.add(m.group(1), 'SESSION_SOCKET_PATH', exists_ok=True) 111 r = 0 112 try: 113 stat_info = os.stat(config.get('SESSION_SOCKET_PATH')) 114 if not stat.S_ISSOCK(stat_info.st_mode): 115 r = 1 116 except FileNotFoundError: 117 r = 1 118 119 if r > 0: 120 sys.stderr.write('{} is not a socket\n'.format(config.get('SESSION_SOCKET_PATH'))) 121 sys.exit(1) 122 123 mode = OpMode.UNIX 124 125 logg.info('using mode {}'.format(mode.value)) 126 127 if config.get('_SOURCE') == None: 128 sys.stderr.write('source data missing\n') 129 sys.exit(1) 130 131 132 class SocketSender: 133 134 def __init__(self, settings): 135 self.path = settings.get('SESSION_SOCKET_PATH') 136 137 def send(self, tx): 138 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 139 err = None 140 try: 141 s.connect(self.path) 142 except FileNotFoundError as e: 143 err = e 144 if err != None: 145 s.close() 146 raise err 147 s.sendall(tx.encode('utf-8')) 148 r = s.recv(68) 149 s.close() 150 return r 151 152 153 def main(): 154 conn = settings.get('CONN') 155 token_resolver = None 156 if settings.get('TOKEN_MODULE') != None: 157 import importlib 158 m = importlib.import_module(settings.get('TOKEN_MODULE')) 159 m = m.TokenResolver 160 else: 161 from chaind.eth.token.gas import GasTokenResolver 162 m = GasTokenResolver 163 token_resolver = m( 164 settings.get('CHAIN_SPEC'), 165 settings.get('SENDER_ADDRESS'), 166 settings.get('SIGNER'), 167 settings.get('GAS_ORACLE'), 168 settings.get('NONCE_ORACLE'), 169 ) 170 171 logg.debug('source {}'.format(config.get('_SOURCE'))) 172 processor = Processor(token_resolver, config.get('_SOURCE')[0], use_checksum=not config.get('_UNSAFE')) 173 processor.add_processor(CSVProcessor()) 174 175 sends = None 176 try: 177 sends = processor.load(conn) 178 except TxSourceError as e: 179 sys.stderr.write('processing error: {}. processors: {}\n'.format(str(e), str(processor))) 180 sys.exit(1) 181 182 sender = None 183 if config.true('_SOCKET_SEND'): 184 if settings.get('SESSION_SOCKET_PATH') != None: 185 sender = SocketSender(settings) 186 187 tx_iter = iter(processor) 188 out = Outputter(mode) 189 while True: 190 tx = None 191 try: 192 tx_bytes = next(tx_iter) 193 except StopIteration: 194 break 195 tx_hex = tx_bytes.hex() 196 if sender != None: 197 r = None 198 try: 199 r = sender.send(tx_hex) 200 except FileNotFoundError as e: 201 sys.stderr.write('send to socket {} failed: {}\n'.format(sender.path, e)) 202 sys.exit(1) 203 logg.info('sent {} result {}'.format(tx_hex, r)) 204 print(out.do(tx_hex)) 205 206 207 if __name__ == '__main__': 208 main()