chaind

Base package for chain queue serviceBase package for chain queue service
Log | Files | Refs | LICENSE

commit 8c2905b8010a50eeb1c9b1bec431e092bc8dab65
parent b77aafc7bcd80fa49687f4d9f5721d6f0022a27a
Author: lash <dev@holbrook.no>
Date:   Sun, 10 Apr 2022 09:34:50 +0000

Move sessioncontroller from previous chaind-eth to session module

Diffstat:
Mchaind/error.py | 16++++++++++++++++
Achaind/session.py | 108+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mchaind/setup.py | 1-
3 files changed, 124 insertions(+), 1 deletion(-)

diff --git a/chaind/error.py b/chaind/error.py @@ -1,2 +1,18 @@ class TxSourceError(Exception): pass + + +class NothingToDoError(Exception): + pass + + +class ClientGoneError(Exception): + pass + + +class ClientBlockError(BlockingIOError): + pass + + +class ClientInputError(ValueError): + pass diff --git a/chaind/session.py b/chaind/session.py @@ -0,0 +1,108 @@ +# standard imports +import os +import socket +import logging +import stat + +# external imports +from hexathon import strip_0x + +# local imports +from chaind.error import ( + NothingToDoError, + ClientGoneError, + ClientBlockError, + ClientInputError, + ) + +logg = logging.getLogger(__name__) + + +class SessionController: + + def __init__(self, config, adapter, processor): + self.dead = False + os.makedirs(os.path.dirname(config.get('SESSION_SOCKET_PATH')), exist_ok=True) + try: + os.unlink(config.get('SESSION_SOCKET_PATH')) + except FileNotFoundError: + pass + self.socket_path = config.get('SESSION_SOCKET_PATH') + + self.srv = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) + self.srv.bind(config.get('SESSION_SOCKET_PATH')) + self.srv.listen(2) + self.srv.settimeout(float(config.get('SESSION_DISPATCH_DELAY'))) + self.processor = processor + self.chain_spec = config.get('CHAIN_SPEC') + self.adapter = adapter + + + def shutdown(self, signo, frame): + if self.dead: + return + self.dead = True + if signo != None: + logg.info('closing on {}'.format(signo)) + else: + logg.info('explicit shutdown') + sockname = self.srv.getsockname() + self.srv.close() + try: + os.unlink(sockname) + except FileNotFoundError: + logg.warning('socket file {} already gone'.format(sockname)) + + + def get_connection(self): + return self.srv.accept() + + + def process(self, conn): + r = self.processor(self.chain_spec, self.adapter, conn) + if r > 0: + self.srv.settimeout(0.1) + else: + self.srv.settimeout(4.0) + + + def get(self): + srvs = None + try: + logg.debug('getting connection') + (srvs, srvs_addr) = self.get_connection() + except OSError as e: + try: + fi = os.stat(self.socket_path) + except FileNotFoundError: + logg.error('socket is gone') + raise ClientGoneError() + if not stat.S_ISSOCK(fi.st_mode): + logg.error('entity on socket path is not a socket') + raise ClientGoneError() + if srvs == None: + logg.debug('timeout (remote socket is none)') + raise NothingToDoError() + + self.srv.settimeout(0.1) + srvs.settimeout(0.1) + data_in = None + try: + data_in = srvs.recv(1048576) + except BlockingIOError as e: + logg.debug('block io error: {}'.format(e)) + + if data_in == None: + raise ClientBlockError() + + data = None + try: + data_in_str = data_in.decode('utf-8') + data_hex = strip_0x(data_in_str.rstrip()) + data = bytes.fromhex(data_hex) + except ValueError: + logg.error('invalid input "{}"'.format(data_in_str)) + raise ClientInputError() + + logg.info('recv {} bytes'.format(len(data))) + return data diff --git a/chaind/setup.py b/chaind/setup.py @@ -12,7 +12,6 @@ from xdg.BaseDirectory import ( save_config_path, ) - class Environment: def __init__(self, domain=None, session=None, env={}):