chaind-eth

Queue server for ethereum
Info | Log | Files | Refs | README | LICENSE

send.py (5929B)


      1 # standard imports
      2 import os
      3 import logging
      4 import sys
      5 import datetime
      6 import enum
      7 import re
      8 import stat
      9 import socket
     10 
     11 # external imports
     12 import chainlib.eth.cli
     13 from chainlib.eth.cli.arg import (
     14         Arg,
     15         ArgFlag,
     16         process_args,
     17         )
     18 from chainlib.eth.cli.config import (
     19         Config,
     20         process_config,
     21         )
     22 from chaind.setup import Environment
     23 from chainlib.eth.gas import price
     24 from chainlib.chain import ChainSpec
     25 from hexathon import strip_0x
     26 from chainqueue.cli.arg import (
     27         apply_arg as apply_arg_queue,
     28         apply_flag as apply_flag_queue,
     29         )
     30 from chainqueue.data import config_dir as chainqueue_config_dir
     31 from chaind.data import config_dir as chaind_config_dir
     32 from chaind.cli.arg import (
     33         apply_arg,
     34         apply_flag,
     35         )
     36 from chainlib.eth.cli.log import process_log
     37 from chaind.settings import process_queue
     38 from chaind.settings import ChaindSettings
     39 from chaind.error import TxSourceError
     40 from chainlib.error import (
     41         InitializationError,
     42         SignerMissingException,
     43         )
     44 from chaind.cli.config import process_config as process_config_local
     45 
     46 # local imports
     47 from chaind.eth.token.process import Processor
     48 from chaind.eth.token.gas import GasTokenResolver
     49 from chaind.eth.cli.csv import CSVProcessor
     50 from chaind.eth.cli.output import (
     51         Outputter,
     52         OpMode,
     53         )
     54 from chaind.eth.settings import process_settings
     55 
     56 logg = logging.getLogger()
     57 
     58 
     59 def process_settings_local(settings, config):
     60 #    if settings.get('SIGNER') == None:
     61 #        raise SignerMissingException('signer missing')
     62     return settings
     63 
     64 
     65 env = Environment(domain='eth', env=os.environ)
     66 
     67 arg_flags = ArgFlag()
     68 arg_flags = apply_flag_queue(arg_flags)
     69 arg_flags = apply_flag(arg_flags)
     70 
     71 arg = Arg(arg_flags)
     72 arg = apply_arg_queue(arg)
     73 arg = apply_arg(arg)
     74 arg.set_long('s', 'send-rpc')
     75 
     76 flags = arg_flags.STD_WRITE | arg_flags.TOKEN | arg_flags.SOCKET_CLIENT | arg_flags.STATE | arg_flags.WALLET | arg_flags.SESSION
     77 
     78 argparser = chainlib.eth.cli.ArgumentParser()
     79 argparser = process_args(argparser, arg, flags)
     80 argparser.add_argument('source', help='Transaction source file')
     81 args = argparser.parse_args()
     82 
     83 logg = process_log(args, logg)
     84 
     85 config = Config()
     86 config.add_schema_dir(chainqueue_config_dir)
     87 config.add_schema_dir(chaind_config_dir)
     88 config = process_config(config, arg, args, flags)
     89 config = process_config_local(config, arg, args, flags)
     90 config.add(args.source, '_SOURCE', False)
     91 config.add('queue', 'CHAIND_COMPONENT', False)
     92 config.add('eth', 'CHAIND_ENGINE', False)
     93 logg.debug('config loaded:\n{}'.format(config))
     94 
     95 try:
     96     settings = ChaindSettings(include_sync=True)
     97     settings = process_settings(settings, config)
     98     settings = process_queue(settings, config)
     99     settings = process_settings_local(settings, config)
    100 except InitializationError as e:
    101     sys.stderr.write('Initialization error: ' + str(e) + '\n')
    102     sys.exit(1)
    103 logg.debug('settings loaded:\n{}'.format(settings))
    104 
    105 mode = OpMode.STDOUT
    106 
    107 re_unix = r'^ipc://(/.+)'
    108 m = re.match(re_unix, config.get('SESSION_SOCKET_PATH', ''))
    109 if m != None:
    110     config.add(m.group(1), 'SESSION_SOCKET_PATH', exists_ok=True)
    111     r = 0
    112     try:
    113         stat_info = os.stat(config.get('SESSION_SOCKET_PATH'))
    114         if not stat.S_ISSOCK(stat_info.st_mode):
    115             r = 1
    116     except FileNotFoundError:
    117         r = 1
    118 
    119     if r > 0:
    120         sys.stderr.write('{} is not a socket\n'.format(config.get('SESSION_SOCKET_PATH')))
    121         sys.exit(1)
    122     
    123     mode = OpMode.UNIX
    124 
    125 logg.info('using mode {}'.format(mode.value))
    126 
    127 if config.get('_SOURCE') == None:
    128     sys.stderr.write('source data missing\n')
    129     sys.exit(1)
    130 
    131 
    132 class SocketSender:
    133 
    134     def __init__(self, settings):
    135         self.path = settings.get('SESSION_SOCKET_PATH')
    136 
    137     def send(self, tx):
    138         s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    139         err = None
    140         try:
    141             s.connect(self.path)
    142         except FileNotFoundError as e:
    143             err = e
    144         if err != None:
    145             s.close()
    146             raise err
    147         s.sendall(tx.encode('utf-8'))
    148         r = s.recv(68)
    149         s.close()
    150         return r
    151 
    152 
    153 def main():
    154     conn = settings.get('CONN')
    155     token_resolver = None
    156     if settings.get('TOKEN_MODULE') != None:
    157         import importlib
    158         m = importlib.import_module(settings.get('TOKEN_MODULE'))
    159         m = m.TokenResolver
    160     else:
    161         from chaind.eth.token.gas import GasTokenResolver
    162         m = GasTokenResolver
    163     token_resolver = m(
    164             settings.get('CHAIN_SPEC'),
    165             settings.get('SENDER_ADDRESS'),
    166             settings.get('SIGNER'),
    167             settings.get('GAS_ORACLE'),
    168             settings.get('NONCE_ORACLE'),
    169             )
    170     
    171     logg.debug('source {}'.format(config.get('_SOURCE')))
    172     processor = Processor(token_resolver, config.get('_SOURCE')[0], use_checksum=not config.get('_UNSAFE'))
    173     processor.add_processor(CSVProcessor())
    174 
    175     sends = None
    176     try:
    177         sends = processor.load(conn)
    178     except TxSourceError as e:
    179         sys.stderr.write('processing error: {}. processors: {}\n'.format(str(e), str(processor)))
    180         sys.exit(1)
    181 
    182     sender = None
    183     if config.true('_SOCKET_SEND'):
    184         if settings.get('SESSION_SOCKET_PATH') != None:
    185             sender = SocketSender(settings)
    186 
    187     tx_iter = iter(processor)
    188     out = Outputter(mode)
    189     while True:
    190         tx = None
    191         try:
    192             tx_bytes = next(tx_iter)
    193         except StopIteration:
    194             break
    195         tx_hex = tx_bytes.hex()
    196         if sender != None:
    197             r = None
    198             try:
    199                 r = sender.send(tx_hex)
    200             except FileNotFoundError as e:
    201                 sys.stderr.write('send to socket {} failed: {}\n'.format(sender.path, e))
    202                 sys.exit(1)
    203             logg.info('sent {} result {}'.format(tx_hex, r))
    204         print(out.do(tx_hex))
    205 
    206 
    207 if __name__ == '__main__':
    208     main()