chaind-eth

Queue server for ethereum
Info | Log | Files | Refs | README | LICENSE

queuer.py (4756B)


      1 # standard imports
      2 import os
      3 import logging
      4 import signal
      5 
      6 # external imports
      7 import chainlib.eth.cli
      8 from chainlib.eth.cli.arg import (
      9         Arg,
     10         ArgFlag,
     11         process_args,
     12         )
     13 from chainlib.eth.cli.config import (
     14         Config,
     15         process_config,
     16         )
     17 from chainqueue.cli.arg import (
     18         apply_arg as apply_arg_queue,
     19         apply_flag as apply_flag_queue,
     20         )
     21 from chaind.cli.arg import (
     22         apply_arg,
     23         apply_flag,
     24         )
     25 from chaind.session import SessionController
     26 from chaind.setup import Environment
     27 from chaind.error import (
     28         NothingToDoError,
     29         ClientGoneError,
     30         ClientBlockError,
     31         ClientInputError,
     32         )
     33 from chainqueue import (
     34         Store,
     35         Status,
     36         )
     37 from chainqueue.error import DuplicateTxError
     38 from chainqueue.store.fs import (
     39         IndexStore,
     40         CounterStore,
     41         )
     42 from chainqueue.cache import CacheTokenTx
     43 from chainlib.encode import TxHexNormalizer
     44 from chainlib.chain import ChainSpec
     45 from chaind.adapters.fs import ChaindFsAdapter
     46 from chaind.dispatch import DispatchProcessor
     47 from chainqueue.data import config_dir as chainqueue_config_dir
     48 from chaind.data import config_dir as chaind_config_dir
     49 from chainlib.eth.cli.log import process_log
     50 from chaind.cli.config import process_config as process_config_local
     51 
     52 # local imports
     53 from chaind.eth.cache import EthCacheTx
     54 from chaind.eth.settings import ChaindSettings
     55 from chaind.eth.dispatch import EthDispatcher
     56 from chaind.eth.settings import process_settings
     57 from chaind.settings import (
     58         process_queue,
     59         process_socket,
     60         process_dispatch,
     61         )
     62 
     63 logging.basicConfig(level=logging.WARNING)
     64 logg = logging.getLogger()
     65 
     66 script_dir = os.path.dirname(os.path.realpath(__file__))
     67 config_dir = os.path.join(script_dir, '..', 'data', 'config')
     68 
     69 env = Environment(domain='eth', env=os.environ)
     70 
     71 arg_flags = ArgFlag()
     72 arg_flags = apply_flag_queue(arg_flags)
     73 arg_flags = apply_flag(arg_flags)
     74 
     75 arg = Arg(arg_flags)
     76 arg = apply_arg_queue(arg)
     77 arg = apply_arg(arg)
     78 
     79 flags = arg_flags.STD_READ | arg_flags.QUEUE | arg_flags.STATE | arg_flags.SESSION
     80 
     81 argparser = chainlib.eth.cli.ArgumentParser()
     82 argparser = process_args(argparser, arg, flags)
     83 args = argparser.parse_args()
     84 
     85 logg = process_log(args, logg)
     86 
     87 config = Config()
     88 config.add_schema_dir(chainqueue_config_dir)
     89 config.add_schema_dir(chaind_config_dir)
     90 config = process_config(config, arg, args, flags)
     91 config = process_config_local(config, arg, args, flags)
     92 config.add('eth', 'CHAIND_ENGINE', False)
     93 config.add('sync', 'CHAIND_COMPONENT', False)
     94 logg.debug('config loaded:\n{}'.format(config))
     95 
     96 settings = ChaindSettings(include_sync=True)
     97 settings = process_settings(settings, config)
     98 settings = process_queue(settings, config)
     99 settings = process_socket(settings, config)
    100 settings = process_dispatch(settings, config)
    101 logg.debug('settings loaded:\n{}'.format(settings))
    102 
    103 tx_normalizer = TxHexNormalizer().tx_hash
    104 token_cache_store = CacheTokenTx(settings.get('CHAIN_SPEC'), normalizer=tx_normalizer)
    105 
    106 dispatcher = EthDispatcher(settings.get('CONN'))
    107 processor = DispatchProcessor(settings.get('CHAIN_SPEC'), settings.dir_for('queue'), dispatcher)
    108 ctrl = SessionController(settings, processor.process)
    109 
    110 signal.signal(signal.SIGINT, ctrl.shutdown)
    111 signal.signal(signal.SIGTERM, ctrl.shutdown)
    112 
    113 logg.info('session id is ' + settings.get('SESSION_ID'))
    114 logg.info('session socket path is ' + settings.get('SESSION_SOCKET_PATH'))
    115 
    116 
    117 def main():
    118     global dispatcher, settings
    119 
    120     queue_adapter = ChaindFsAdapter(
    121         settings.get('CHAIN_SPEC'),
    122         settings.dir_for('queue'),
    123         EthCacheTx,
    124         dispatcher,
    125         store_sync=False,
    126         )
    127 
    128     while True:
    129         v = None
    130         client_socket = None
    131         try:
    132             (client_socket, v) = ctrl.get()
    133         except ClientGoneError:
    134             break
    135         except ClientBlockError:
    136             continue
    137         except ClientInputError:
    138             continue
    139         except NothingToDoError:
    140             pass
    141 
    142         if v == None:
    143             ctrl.process(settings.get('CONN'))
    144             #queue_adapter = create_adapter(settings, dispatcher)
    145             continue
    146 
    147         result_data = None
    148         r = 0 # no error
    149         try:
    150             result_data = queue_adapter.put(v.hex())
    151         except DuplicateTxError as e:
    152             logg.error('tx already exists: {}'.format(e))
    153             r = 1
    154         except ValueError as e:
    155             logg.error('adapter rejected input {}: "{}"'.format(v.hex(), e))
    156             continue
    157 
    158         if r == 0:
    159             queue_adapter.enqueue(result_data)
    160 
    161         ctrl.respond_put(client_socket, r, extra_data=result_data)
    162         
    163 
    164 if __name__ == '__main__':
    165     main()