server.py (7521B)
1 # standard imports 2 import sys 3 import time 4 import socket 5 import signal 6 import os 7 import logging 8 import stat 9 import argparse 10 11 # external imports 12 import chainlib.eth.cli 13 from chaind import Environment 14 from hexathon import strip_0x 15 from chainlib.chain import ChainSpec 16 from chainlib.eth.connection import EthHTTPConnection 17 from chainqueue.sql.backend import SQLBackend 18 from chainlib.error import JSONRPCException 19 from chainqueue.db import dsn_from_config 20 from chaind.sql.session import SessionIndex 21 22 # local imports 23 from chaind_eth.dispatch import Dispatcher 24 from chainqueue.adapters.eth import EthAdapter 25 26 logging.basicConfig(level=logging.WARNING) 27 logg = logging.getLogger() 28 29 script_dir = os.path.dirname(os.path.realpath(__file__)) 30 config_dir = os.path.join(script_dir, '..', 'data', 'config') 31 32 env = Environment(domain='eth', env=os.environ) 33 34 arg_flags = chainlib.eth.cli.argflag_std_read 35 argparser = chainlib.eth.cli.ArgumentParser(arg_flags) 36 argparser.add_argument('--data-dir', type=str, help='data directory') 37 argparser.add_argument('--runtime-dir', type=str, help='runtime directory') 38 argparser.add_argument('--session-id', dest='session_id', type=str, help='session identifier') 39 argparser.add_argument('--dispatch-delay', dest='dispatch_delay', type=float, help='socket timeout before processing queue') 40 args = argparser.parse_args() 41 extra_args = { 42 'runtime_dir': 'SESSION_RUNTIME_DIR', 43 'data_dir': 'SESSION_DATA_DIR', 44 'session_id': 'SESSION_ID', 45 'dispatch_delay': 'SESSION_DISPATCH_DELAY', 46 } 47 #config = chainlib.eth.cli.Config.from_args(args, arg_flags, default_config_dir=config_dir, extend_base_config_dir=config_dir) 48 config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir) 49 50 logg.debug('session id {} {}'.format(type(config.get('SESSION_ID')), config.get('SESSION_ID'))) 51 if config.get('SESSION_ID') == None: 52 config.add(env.session, 'SESSION_ID', exists_ok=True) 53 if config.get('SESSION_RUNTIME_DIR') == None: 54 config.add(env.runtime_dir, 'SESSION_RUNTIME_DIR', exists_ok=True) 55 if config.get('SESSION_DATA_DIR') == None: 56 config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True) 57 if not config.get('SESSION_SOCKET_PATH'): 58 socket_path = os.path.join(config.get('SESSION_RUNTIME_DIR'), config.get('SESSION_ID'), 'chaind.sock') 59 config.add(socket_path, 'SESSION_SOCKET_PATH', True) 60 61 if config.get('DATABASE_ENGINE') == 'sqlite': 62 #config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) 63 config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) 64 65 config.censor('PASSWORD', 'DATABASE') 66 logg.debug('config loaded:\n{}'.format(config)) 67 68 69 # verify setup 70 try: 71 os.stat(config.get('DATABASE_NAME')) 72 except FileNotFoundError: 73 sys.stderr.write('database file {} not found. please run database migration script first\n'.format(config.get('DATABASE_NAME'))) 74 sys.exit(1) 75 76 77 class SessionController: 78 79 def __init__(self, config): 80 self.dead = False 81 os.makedirs(os.path.dirname(config.get('SESSION_SOCKET_PATH')), exist_ok=True) 82 try: 83 os.unlink(config.get('SESSION_SOCKET_PATH')) 84 except FileNotFoundError: 85 pass 86 87 self.srv = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) 88 self.srv.bind(config.get('SESSION_SOCKET_PATH')) 89 self.srv.listen(2) 90 self.srv.settimeout(float(config.get('SESSION_DISPATCH_DELAY'))) 91 92 93 def shutdown(self, signo, frame): 94 if self.dead: 95 return 96 self.dead = True 97 if signo != None: 98 logg.info('closing on {}'.format(signo)) 99 else: 100 logg.info('explicit shutdown') 101 sockname = self.srv.getsockname() 102 self.srv.close() 103 try: 104 os.unlink(sockname) 105 except FileNotFoundError: 106 logg.warning('socket file {} already gone'.format(sockname)) 107 108 109 def get_connection(self): 110 return self.srv.accept() 111 112 113 ctrl = SessionController(config) 114 115 signal.signal(signal.SIGINT, ctrl.shutdown) 116 signal.signal(signal.SIGTERM, ctrl.shutdown) 117 118 chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) 119 120 rpc = chainlib.eth.cli.Rpc() 121 conn = rpc.connect_by_config(config) 122 123 dsn = dsn_from_config(config) 124 backend = SQLBackend(dsn, error_parser=rpc.error_parser, debug=config.true('DATABASE_DEBUG')) 125 session_index_backend = SessionIndex(config.get('SESSION_ID')) 126 adapter = EthAdapter(backend, session_index_backend=session_index_backend) 127 128 129 def process_outgoing(chain_spec, adapter, rpc, limit=100): 130 dispatcher = Dispatcher(chain_spec, adapter, limit=limit) 131 session = adapter.create_session() 132 r = dispatcher.process(rpc, session) 133 session.close() 134 return r 135 136 137 def main(): 138 while True: 139 srvs = None 140 try: 141 logg.debug('getting connection') 142 (srvs, srvs_addr) = ctrl.get_connection() 143 except OSError as e: 144 try: 145 fi = os.stat(config.get('SESSION_SOCKET_PATH')) 146 except FileNotFoundError: 147 logg.error('socket is gone') 148 break 149 if not stat.S_ISSOCK(fi.st_mode): 150 logg.error('entity on socket path is not a socket') 151 break 152 if srvs == None: 153 logg.debug('timeout (remote socket is none)') 154 r = process_outgoing(chain_spec, adapter, conn) 155 if r > 0: 156 ctrl.srv.settimeout(0.1) 157 else: 158 ctrl.srv.settimeout(4.0) 159 continue 160 ctrl.srv.settimeout(0.1) 161 srvs.settimeout(0.1) 162 data_in = None 163 try: 164 data_in = srvs.recv(1048576) 165 except BlockingIOError as e: 166 logg.debug('block io error: {}'.format(e)) 167 continue 168 169 data = None 170 try: 171 data_in_str = data_in.decode('utf-8') 172 data_hex = strip_0x(data_in_str.rstrip()) 173 data = bytes.fromhex(data_hex) 174 except ValueError: 175 logg.error('invalid input "{}"'.format(data_in_str)) 176 continue 177 178 logg.debug('recv {} bytes'.format(len(data))) 179 session = backend.create_session() 180 tx_hash = None 181 signed_tx = None 182 try: 183 tx_hash = adapter.add(data_hex, chain_spec, session=session) 184 except ValueError as e: 185 try: 186 signed_tx = adapter.get(data_hex, chain_spec, session=session) 187 except ValueError as e: 188 logg.error('invalid input: {}'.format(e)) 189 190 if tx_hash != None: 191 session.commit() 192 try: 193 r = int(0).to_bytes(4, byteorder='big') 194 r += strip_0x(tx_hash).encode('utf-8') 195 srvs.send(r) 196 logg.debug('{} bytes sent'.format(r)) 197 except BrokenPipeError: 198 logg.debug('they just hung up. how rude.') 199 elif signed_tx != None: 200 r = int(0).to_bytes(4, byteorder='big') 201 r += strip_0x(signed_tx).encode('utf-8') 202 try: 203 r = srvs.send(r) 204 except BrokenPipeError: 205 logg.debug('they just hung up. how useless.') 206 else: 207 r = srvs.send(int(1).to_bytes(4, byteorder='big')) 208 209 session.close() 210 srvs.close() 211 212 ctrl.shutdown(None, None) 213 214 if __name__ == '__main__': 215 main()