chaind-eth

Queue server for ethereum
Info | Log | Files | Refs | README | LICENSE

server.py (7521B)


      1 # standard imports
      2 import sys
      3 import time
      4 import socket
      5 import signal
      6 import os
      7 import logging
      8 import stat
      9 import argparse
     10 
     11 # external imports
     12 import chainlib.eth.cli
     13 from chaind import Environment
     14 from hexathon import strip_0x
     15 from chainlib.chain import ChainSpec
     16 from chainlib.eth.connection import EthHTTPConnection
     17 from chainqueue.sql.backend import SQLBackend
     18 from chainlib.error import JSONRPCException
     19 from chainqueue.db import dsn_from_config
     20 from chaind.sql.session import SessionIndex
     21 
     22 # local imports
     23 from chaind_eth.dispatch import Dispatcher
     24 from chainqueue.adapters.eth import EthAdapter
     25 
     26 logging.basicConfig(level=logging.WARNING)
     27 logg = logging.getLogger()
     28 
     29 script_dir = os.path.dirname(os.path.realpath(__file__))
     30 config_dir = os.path.join(script_dir, '..', 'data', 'config')
     31 
     32 env = Environment(domain='eth', env=os.environ)
     33 
     34 arg_flags = chainlib.eth.cli.argflag_std_read
     35 argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
     36 argparser.add_argument('--data-dir', type=str, help='data directory')
     37 argparser.add_argument('--runtime-dir', type=str, help='runtime directory')
     38 argparser.add_argument('--session-id', dest='session_id', type=str, help='session identifier')
     39 argparser.add_argument('--dispatch-delay', dest='dispatch_delay', type=float, help='socket timeout before processing queue')
     40 args = argparser.parse_args()
     41 extra_args = {
     42     'runtime_dir': 'SESSION_RUNTIME_DIR',
     43     'data_dir': 'SESSION_DATA_DIR',
     44     'session_id': 'SESSION_ID', 
     45     'dispatch_delay': 'SESSION_DISPATCH_DELAY',
     46         }
     47 #config = chainlib.eth.cli.Config.from_args(args, arg_flags, default_config_dir=config_dir, extend_base_config_dir=config_dir)
     48 config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
     49 
     50 logg.debug('session id {} {}'.format(type(config.get('SESSION_ID')), config.get('SESSION_ID')))
     51 if config.get('SESSION_ID') == None:
     52     config.add(env.session, 'SESSION_ID', exists_ok=True)
     53 if config.get('SESSION_RUNTIME_DIR') == None:
     54     config.add(env.runtime_dir, 'SESSION_RUNTIME_DIR', exists_ok=True)
     55 if config.get('SESSION_DATA_DIR') == None:
     56     config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True)
     57 if not config.get('SESSION_SOCKET_PATH'):
     58     socket_path = os.path.join(config.get('SESSION_RUNTIME_DIR'), config.get('SESSION_ID'), 'chaind.sock')
     59     config.add(socket_path, 'SESSION_SOCKET_PATH', True)
     60 
     61 if config.get('DATABASE_ENGINE') == 'sqlite':
     62     #config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True)
     63     config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True)
     64     
     65 config.censor('PASSWORD', 'DATABASE')
     66 logg.debug('config loaded:\n{}'.format(config))
     67 
     68 
     69 # verify setup
     70 try:
     71     os.stat(config.get('DATABASE_NAME'))
     72 except FileNotFoundError:
     73     sys.stderr.write('database file {} not found. please run database migration script first\n'.format(config.get('DATABASE_NAME')))
     74     sys.exit(1)
     75 
     76 
     77 class SessionController:
     78 
     79     def __init__(self, config):
     80         self.dead = False
     81         os.makedirs(os.path.dirname(config.get('SESSION_SOCKET_PATH')), exist_ok=True)
     82         try:
     83             os.unlink(config.get('SESSION_SOCKET_PATH'))
     84         except FileNotFoundError:
     85             pass
     86 
     87         self.srv = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
     88         self.srv.bind(config.get('SESSION_SOCKET_PATH'))
     89         self.srv.listen(2)
     90         self.srv.settimeout(float(config.get('SESSION_DISPATCH_DELAY')))
     91 
     92 
     93     def shutdown(self, signo, frame):
     94         if self.dead:
     95             return
     96         self.dead = True
     97         if signo != None:
     98             logg.info('closing on {}'.format(signo))
     99         else:
    100             logg.info('explicit shutdown')
    101         sockname = self.srv.getsockname()
    102         self.srv.close()
    103         try:
    104             os.unlink(sockname)
    105         except FileNotFoundError:
    106             logg.warning('socket file {} already gone'.format(sockname))
    107 
    108 
    109     def get_connection(self):
    110         return self.srv.accept()
    111 
    112 
    113 ctrl = SessionController(config)
    114 
    115 signal.signal(signal.SIGINT, ctrl.shutdown)
    116 signal.signal(signal.SIGTERM, ctrl.shutdown)
    117 
    118 chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
    119 
    120 rpc = chainlib.eth.cli.Rpc()
    121 conn = rpc.connect_by_config(config)
    122 
    123 dsn = dsn_from_config(config)
    124 backend = SQLBackend(dsn, error_parser=rpc.error_parser, debug=config.true('DATABASE_DEBUG'))
    125 session_index_backend = SessionIndex(config.get('SESSION_ID'))
    126 adapter = EthAdapter(backend, session_index_backend=session_index_backend)
    127 
    128 
    129 def process_outgoing(chain_spec, adapter, rpc, limit=100):
    130     dispatcher = Dispatcher(chain_spec, adapter, limit=limit)
    131     session = adapter.create_session()
    132     r = dispatcher.process(rpc, session)
    133     session.close()
    134     return r
    135 
    136 
    137 def main():
    138     while True:
    139         srvs = None
    140         try:
    141             logg.debug('getting connection')
    142             (srvs, srvs_addr) = ctrl.get_connection()
    143         except OSError as e:
    144             try:
    145                 fi = os.stat(config.get('SESSION_SOCKET_PATH'))
    146             except FileNotFoundError:
    147                 logg.error('socket is gone')
    148                 break
    149             if not stat.S_ISSOCK(fi.st_mode):
    150                 logg.error('entity on socket path is not a socket')
    151                 break
    152             if srvs == None:
    153                 logg.debug('timeout (remote socket is none)')
    154                 r = process_outgoing(chain_spec, adapter, conn)
    155                 if r > 0:
    156                     ctrl.srv.settimeout(0.1)
    157                 else:
    158                     ctrl.srv.settimeout(4.0)
    159                 continue
    160         ctrl.srv.settimeout(0.1)
    161         srvs.settimeout(0.1)
    162         data_in = None
    163         try:
    164             data_in = srvs.recv(1048576)
    165         except BlockingIOError as e:
    166             logg.debug('block io error: {}'.format(e))
    167             continue
    168 
    169         data = None
    170         try:
    171             data_in_str = data_in.decode('utf-8')
    172             data_hex = strip_0x(data_in_str.rstrip())
    173             data = bytes.fromhex(data_hex)
    174         except ValueError:
    175             logg.error('invalid input "{}"'.format(data_in_str))
    176             continue
    177 
    178         logg.debug('recv {} bytes'.format(len(data)))
    179         session = backend.create_session()
    180         tx_hash = None
    181         signed_tx = None
    182         try:
    183             tx_hash = adapter.add(data_hex, chain_spec, session=session)
    184         except ValueError as e:
    185             try:
    186                 signed_tx = adapter.get(data_hex, chain_spec, session=session) 
    187             except ValueError as e:
    188                 logg.error('invalid input: {}'.format(e))
    189 
    190         if tx_hash != None:
    191             session.commit()
    192             try:
    193                 r = int(0).to_bytes(4, byteorder='big')
    194                 r += strip_0x(tx_hash).encode('utf-8')
    195                 srvs.send(r)
    196                 logg.debug('{} bytes sent'.format(r))
    197             except BrokenPipeError:
    198                 logg.debug('they just hung up. how rude.')
    199         elif signed_tx != None:
    200             r = int(0).to_bytes(4, byteorder='big')
    201             r += strip_0x(signed_tx).encode('utf-8')
    202             try:
    203                 r = srvs.send(r)
    204             except BrokenPipeError:
    205                 logg.debug('they just hung up. how useless.')
    206         else:
    207             r = srvs.send(int(1).to_bytes(4, byteorder='big'))
    208 
    209         session.close()
    210         srvs.close()
    211 
    212     ctrl.shutdown(None, None)
    213 
    214 if __name__ == '__main__':
    215     main()