chainqueue

Blockchain transaction queue control
Log | Files | Refs | LICENSE

commit eca9dd95d669e0c34ee683afe2a2905ef6263c55
parent ca4f7c9b8ec6c951cdadb6ea4bc8b63ff3467ca7
Author: nolash <dev@holbrook.no>
Date:   Wed,  2 Jun 2021 12:59:04 +0200

WIP add transaction server

Diffstat:
Achainqueue/adapters/eth.py | 20++++++++++++++++++++
Achainqueue/fs/entry.py | 41+++++++++++++++++++++++++++++++++++++++++
Achainqueue/fs/otx.py | 14++++++++++++++
Achainqueue/runnable/server.py | 129+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Achainqueue/sql/backend.py | 14++++++++++++++
Mrequirements.txt | 2++
Ascripts/migrate.py | 62++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtests/test_fs.py | 2+-
Atests/test_fs_entry.py | 39+++++++++++++++++++++++++++++++++++++++
9 files changed, 322 insertions(+), 1 deletion(-)

diff --git a/chainqueue/adapters/eth.py b/chainqueue/adapters/eth.py @@ -0,0 +1,20 @@ +# external imports +from chainlib.eth.tx import ( + unpack, + ) +from hexathon import add_0x + +# local imports + + +class EthAdapter: + + def __init__(self, backend): + self.backend = backend + + + def add(self, chain_spec, bytecode): + tx = unpack(bytecode, chain_spec) + session = self.backend.create_session() + self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) + session.close() diff --git a/chainqueue/fs/entry.py b/chainqueue/fs/entry.py @@ -0,0 +1,41 @@ +# standard imports +import datetime + +# external imports +from hexathon import strip_0x + +# local imports +from chainqueue.enum import StatusEnum + + +class DefaultApplier: + + def add(self, tx_hash, signed_tx): + logg.debug('noop add {} {}'.format(tx_hash, signed_tx)) + + +class Entry: + + def __init__(self, nonce, tx_hash, signed_tx, applier=DefaultApplier()): + self.nonce = nonce + self.tx_hash = strip_0x(tx_hash) + self.signed_tx = strip_0x(signed_tx) + self.status = StatusEnum.PENDING + + self.applier.add(bytes.fromhex(tx_hash), bytes.fromhex(signed_tx)) + + +class MetaEntry: + + def __init__(self, entry, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, date_created=datetime.datetime.utcnow()): + self.entry = entry + self.sender = sender + self.recipient = recipient + self.source_token_address = source_token_address + self.destination_token_address = destination_token_address + self.from_value = from_value + self.to_value = to_value + self.block_number = block_number + self.tx_index = tx_index + self.date_created = date_created + self.date_updated = self.date_created diff --git a/chainqueue/fs/otx.py b/chainqueue/fs/otx.py @@ -0,0 +1,14 @@ +# local imports +from chainqueue.enum import ( + StatusEnum, + StatusBits, + ) + + +class Otx: + + def __init__(self, nonce, tx_hash, signed_tx): + self.nonce = nonce + self.tx_hash = strip_0x(tx_hash) + self.signed_tx = strip_0x(signed_tx) + self.status = StatusEnum.PENDING diff --git a/chainqueue/runnable/server.py b/chainqueue/runnable/server.py @@ -0,0 +1,129 @@ +import syslog +import sys +import time +import socket +import signal +import os +import logging +import stat +import argparse +import uuid + +# external imports +import confini +from xdg.BaseDirectory import ( + load_first_config, + get_runtime_dir, + ) +from hexathon import strip_0x +from chainlib.chain import ChainSpec + +# local imports +from chainqueue.sql.backend import SQLBackend +from chainqueue.db import dsn_from_config +from chainqueue.adapters.eth import EthAdapter + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +default_config_dir = load_first_config('chainqueue') +config_dir = os.environ.get('CONFINI_DIR', default_config_dir) + +runtime_dir = os.path.join(get_runtime_dir(), 'chainqueue') + +argparser = argparse.ArgumentParser('chainqueue transaction submission and trigger server') +argparser.add_argument('-c', '--config', dest='c', type=str, default=config_dir, help='configuration directory') +argparser.add_argument('--runtime-dir', dest='runtime_dir', type=str, default=runtime_dir, help='runtime directory') +argparser.add_argument('--session-id', dest='session_id', type=str, default=str(uuid.uuid4()), help='session id to use for session') +argparser.add_argument('-v', action='store_true', help='be verbose') +argparser.add_argument('-vv', action='store_true', help='be very verbose') +args = argparser.parse_args(sys.argv[1:]) + +if args.vv: + logg.setLevel(logging.DEBUG) +elif args.v: + logg.setLevel(logging.INFO) + +config = confini.Config(args.c) +config.process() +args_override = { + 'SESSION_RUNTIME_DIR': getattr(args, 'runtime_dir'), + } +config.dict_override(args_override, 'cli args') +config.add(getattr(args, 'session_id'), '_SESSION_ID', True) + +if not config.get('SESSION_SOCKET_PATH'): + socket_path = os.path.join(config.get('SESSION_RUNTIME_DIR'), config.get('_SESSION_ID'), 'chainqueue.sock') + config.add(socket_path, 'SESSION_SOCKET_PATH', True) +logg.debug('config loaded:\n{}'.format(config)) + + +class SessionController: + + def __init__(self, config): + self.dead = False + os.makedirs(os.path.dirname(config.get('SESSION_SOCKET_PATH')), exist_ok=True) + try: + os.unlink(config.get('SESSION_SOCKET_PATH')) + except FileNotFoundError: + pass + + self.srv = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) + self.srv.bind(config.get('SESSION_SOCKET_PATH')) + self.srv.listen(2) + self.srv.settimeout(5.0) + + def shutdown(self, signo, frame): + if self.dead: + return + self.dead = True + if signo != None: + syslog.syslog('closing on {}'.format(signo)) + else: + syslog.syslog('explicit shutdown') + sockname = self.srv.getsockname() + self.srv.close() + try: + os.unlink(sockname) + except FileNotFoundError: + logg.warning('socket file {} already gone'.format(sockname)) + + + def get_connection(self): + return self.srv.accept() + + +ctrl = SessionController(config) + +signal.signal(signal.SIGINT, ctrl.shutdown) +signal.signal(signal.SIGTERM, ctrl.shutdown) + +dsn = dsn_from_config(config) +backend = SQLBackend(dsn) +adapter = EthAdapter(backend) +chain_spec = ChainSpec.from_chain_str('evm:mainnet:1') + +if __name__ == '__main__': + while True: + srvs = None + try: + (srvs, srvs_addr) = ctrl.get_connection() + except OSError as e: + try: + fi = os.stat(config.get('SESSION_SOCKET_PATH')) + except FileNotFoundError: + logg.error('socket is gone') + break + if not stat.S_ISSOCK(fi.st_mode): + logg.error('entity on socket path is not a socket') + break + if srvs == None: + logg.debug('ping') + continue + srvs.setblocking(False) + data_in = srvs.recv(1024) + data_in_str = data_in.decode('utf-8') + data = bytes.fromhex(strip_0x(data_in_str)) + adapter.add(chain_spec, data) + + ctrl.shutdown(None, None) diff --git a/chainqueue/sql/backend.py b/chainqueue/sql/backend.py @@ -0,0 +1,14 @@ +# local imports +from chainqueue.sql.tx import create +from chainqueue.db.models.base import SessionBase + + +class SQLBackend: + + def __init__(self, conn_spec, *args, **kwargs): + SessionBase.connect(conn_spec, pool_size=kwargs.get('poolsize', 0), debug=kwargs.get('debug', False)) + self.create = create + + + def create_session(self): + return SessionBase.create_session() diff --git a/requirements.txt b/requirements.txt @@ -2,3 +2,5 @@ pysha3==1.0.2 hexathon~=0.0.1a7 alembic==1.4.2 SQLAlchemy==1.3.20 +confini~=0.3.6rc3 +pyxdg~=0.27 diff --git a/scripts/migrate.py b/scripts/migrate.py @@ -0,0 +1,62 @@ +#!/usr/bin/python +import os +import argparse +import logging + +# external imports +import alembic +from alembic.config import Config as AlembicConfig +import confini +from xdg.BaseDirectory import ( + load_first_config, + ) + +# local imports +from chainqueue.db import dsn_from_config + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +# BUG: the dbdir doesn't work after script install +rootdir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) +dbdir = os.path.join(rootdir, 'chainqueue', 'db') +migrationsdir = os.path.join(dbdir, 'migrations') + +default_config_dir = load_first_config('chainqueue') +config_dir = os.environ.get('CONFINI_DIR', default_config_dir) + +argparser = argparse.ArgumentParser() +argparser.add_argument('-c', type=str, default=config_dir, help='config file') +argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') +argparser.add_argument('--migrations-dir', dest='migrations_dir', default=migrationsdir, type=str, help='path to alembic migrations directory') +argparser.add_argument('-v', action='store_true', help='be verbose') +argparser.add_argument('-vv', action='store_true', help='be more verbose') +args = argparser.parse_args() + +if args.vv: + logging.getLogger().setLevel(logging.DEBUG) +elif args.v: + logging.getLogger().setLevel(logging.INFO) + +config = confini.Config(args.c, args.env_prefix) +config.process() +config.censor('PASSWORD', 'DATABASE') +config.censor('PASSWORD', 'SSL') +logg.debug('config:\n{}'.format(config)) + +migrations_dir = os.path.join(args.migrations_dir, config.get('DATABASE_ENGINE')) +if not os.path.isdir(migrations_dir): + logg.debug('migrations dir for engine {} not found, reverting to default'.format(config.get('DATABASE_ENGINE'))) + migrations_dir = os.path.join(args.migrations_dir, 'default') + +# connect to database +dsn = dsn_from_config(config) + + +logg.info('using migrations dir {}'.format(migrations_dir)) +logg.info('using db {}'.format(dsn)) +ac = AlembicConfig(os.path.join(migrations_dir, 'alembic.ini')) +ac.set_main_option('sqlalchemy.url', dsn) +ac.set_main_option('script_location', migrations_dir) + +alembic.command.upgrade(ac, 'head') diff --git a/tests/test_fs.py b/tests/test_fs.py @@ -14,7 +14,7 @@ logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() -class HexDirTest(unittest.TestCase): +class FsQueueTest(unittest.TestCase): def setUp(self): self.dir = tempfile.mkdtemp() diff --git a/tests/test_fs_entry.py b/tests/test_fs_entry.py @@ -0,0 +1,39 @@ +# standard imports +import unittest +import tempfile +import shutil +import logging +import os + +# local imports +from chainqueue.fs.queue import FsQueue +from chainqueue.fs.dir import HexDir +from chainqueue.fs.entry import Entry +from chainqueue.enum import StatusBits + +logging.basicConfig(level=logging.DEBUG) +logg = logging.getLogger() + + +class FsQueueEntryTest(unittest.TestCase): + + def setUp(self): + self.dir = tempfile.mkdtemp() + self.hexdir = HexDir(os.path.join(self.dir, 'q'), 32, 2, 8) + self.q = FsQueue(os.path.join(self.dir, 'spool'), backend=self.hexdir) + logg.debug('setup fsqueue root {}'.format(self.dir)) + + + def tearDown(self): + shutil.rmtree(self.dir) + logg.debug('cleaned fsqueue root {}'.format(self.dir)) + + + def test_entry(self): + tx_hash = os.urandom(32) + tx_content = os.urandom(128) + Entry(tx_hash, tx_content) + + +if __name__ == '__main__': + unittest.main()