commit b893aaa7b1e231591c10de617751b757b11ff7bc
parent ab1977b802e418e01f38b5588ee54897c7121719
Author: lash <dev@holbrook.no>
Date: Wed, 27 Apr 2022 06:36:01 +0000
Correct paths between syncer and queuer
Diffstat:
5 files changed, 175 insertions(+), 160 deletions(-)
diff --git a/chaind/eth/runnable/queuer.py b/chaind/eth/runnable/queuer.py
@@ -0,0 +1,134 @@
+# standard imports
+import os
+import logging
+import signal
+
+# external imports
+import chainlib.eth.cli
+import chaind.cli
+from chaind.session import SessionController
+from chaind.setup import Environment
+from chaind.error import (
+ NothingToDoError,
+ ClientGoneError,
+ ClientBlockError,
+ ClientInputError,
+ )
+from chainqueue import (
+ Store,
+ Status,
+ )
+from chainqueue.error import DuplicateTxError
+from chainqueue.store.fs import (
+ IndexStore,
+ CounterStore,
+ )
+from chainqueue.cache import CacheTokenTx
+from chainlib.encode import TxHexNormalizer
+from chainlib.chain import ChainSpec
+from chaind.adapters.fs import ChaindFsAdapter
+
+# local imports
+from chaind.eth.dispatch import EthDispatcher
+from chaind.eth.cache import EthCacheTx
+from chaind.eth.settings import ChaindEthSettings
+
+logging.basicConfig(level=logging.WARNING)
+logg = logging.getLogger()
+
+script_dir = os.path.dirname(os.path.realpath(__file__))
+config_dir = os.path.join(script_dir, '..', 'data', 'config')
+
+env = Environment(domain='eth', env=os.environ)
+
+arg_flags = chainlib.eth.cli.argflag_std_read
+argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
+
+local_arg_flags = chaind.cli.argflag_local_base | chaind.cli.ChaindFlag.DISPATCH | chaind.cli.ChaindFlag.SOCKET
+chaind.cli.process_flags(argparser, local_arg_flags)
+
+args = argparser.parse_args()
+
+base_config_dir = [chaind.cli.config_dir]
+config = chainlib.eth.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir)
+config = chaind.cli.process_config(config, args, local_arg_flags)
+config.add('eth', 'CHAIND_ENGINE', False)
+config.add('queue', 'CHAIND_COMPONENT', False)
+logg.debug('config loaded:\n{}'.format(config))
+
+settings = ChaindEthSettings(include_queue=True)
+settings.process(config)
+
+logg.debug('settings:\n{}'.format(settings))
+
+
+def process_outgoing(chain_spec, adapter, rpc, limit=100):
+ upcoming = adapter.upcoming()
+ logg.info('process {} {} {}'.format(chain_spec, adapter, rpc))
+ logg.info('upcoming {}'.format(upcoming))
+ i = 0
+ for tx_hash in upcoming:
+ if adapter.dispatch(tx_hash):
+ i += 1
+ return i
+
+chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
+
+rpc = chainlib.eth.cli.Rpc()
+conn = rpc.connect_by_config(config)
+
+tx_normalizer = TxHexNormalizer().tx_hash
+token_cache_store = CacheTokenTx(chain_spec, normalizer=tx_normalizer)
+dispatcher = EthDispatcher(conn)
+queue_adapter = ChaindFsAdapter(
+ settings.get('CHAIN_SPEC'),
+ settings.get('SESSION_DATA_DIR'),
+ EthCacheTx,
+ dispatcher,
+ )
+
+ctrl = SessionController(settings, queue_adapter, process_outgoing)
+signal.signal(signal.SIGINT, ctrl.shutdown)
+signal.signal(signal.SIGTERM, ctrl.shutdown)
+
+logg.info('session id is ' + settings.get('SESSION_ID'))
+logg.info('session socket path is ' + settings.get('SESSION_SOCKET_PATH'))
+
+def main():
+ while True:
+ v = None
+ client_socket = None
+ try:
+ (client_socket, v) = ctrl.get()
+ except ClientGoneError:
+ break
+ except ClientBlockError:
+ continue
+ except ClientInputError:
+ continue
+ except NothingToDoError:
+ pass
+
+ if v == None:
+ ctrl.process(conn)
+ continue
+
+ result_data = None
+ r = 0 # no error
+ try:
+ result_data = queue_adapter.put(v.hex())
+ except DuplicateTxError as e:
+ logg.error('tx already exists: {}'.format(e))
+ r = 1
+ except ValueError as e:
+ logg.error('adapter rejected input {}: "{}"'.format(v.hex(), e))
+ continue
+
+ if r == 0:
+ queue_adapter.enqueue(result_data)
+
+ ctrl.respond_put(client_socket, r, extra_data=result_data)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/chaind/eth/runnable/send.py b/chaind/eth/runnable/send.py
@@ -10,6 +10,7 @@ import socket
# external imports
import chainlib.eth.cli
+import chaind.cli
from chaind.setup import Environment
from chainlib.eth.gas import price
from chainlib.chain import ChainSpec
@@ -24,32 +25,38 @@ from chaind.eth.cli.output import (
Outputter,
OpMode,
)
+from chaind.eth.settings import ChaindEthSettings
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
-script_dir = os.path.dirname(os.path.realpath(__file__))
-config_dir = os.path.join(script_dir, '..', 'data', 'config')
-
-
arg_flags = chainlib.eth.cli.argflag_std_write
-argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
-argparser.add_argument('--socket', dest='socket', type=str, help='Socket to send transactions to')
-argparser.add_argument('--token-module', dest='token_module', type=str, help='Python module path to resolve tokens from identifiers')
+argparser = chainlib.eth.cli.ArgumentParser(arg_flags, arg_long={'-s': '--send-rpc'})
argparser.add_positional('source', required=False, type=str, help='Transaction source file')
+
+local_arg_flags = chaind.cli.argflag_local_socket_client
+chaind.cli.process_flags(argparser, local_arg_flags)
+
args = argparser.parse_args()
-extra_args = {
- 'socket': None,
- 'source': None,
- }
env = Environment(domain='eth', env=os.environ)
-config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
-config.add(args.token_module, 'TOKEN_MODULE', True)
+
+base_config_dir = [chaind.cli.config_dir]
+config = chainlib.eth.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir)
+config = chaind.cli.process_config(config, args, local_arg_flags)
+config.add(args.source, '_SOURCE', False)
+config.add('eth', 'CHAIND_ENGINE', False)
+config.add('queue', 'CHAIND_COMPONENT', False)
+logg.debug('config loaded:\n{}'.format(config))
wallet = chainlib.eth.cli.Wallet()
wallet.from_config(config)
+settings = ChaindEthSettings(include_queue=True)
+settings.process(config)
+
+logg.debug('settings:\n{}'.format(settings))
+
rpc = chainlib.eth.cli.Rpc(wallet=wallet)
conn = rpc.connect_by_config(config)
@@ -58,19 +65,19 @@ chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
mode = OpMode.STDOUT
re_unix = r'^ipc://(/.+)'
-m = re.match(re_unix, config.get('_SOCKET', ''))
+m = re.match(re_unix, config.get('SESSION_SOCKET_PATH', ''))
if m != None:
- config.add(m.group(1), '_SOCKET', exists_ok=True)
+ config.add(m.group(1), 'SESSION_SOCKET_PATH', exists_ok=True)
r = 0
try:
- stat_info = os.stat(config.get('_SOCKET'))
+ stat_info = os.stat(config.get('SESSION_SOCKET_PATH'))
if not stat.S_ISSOCK(stat_info.st_mode):
r = 1
except FileNotFoundError:
r = 1
if r > 0:
- sys.stderr.write('{} is not a socket\n'.format(config.get('_SOCKET')))
+ sys.stderr.write('{} is not a socket\n'.format(config.get('SESSION_SOCKET_PATH')))
sys.exit(1)
mode = OpMode.UNIX
@@ -84,8 +91,8 @@ if config.get('_SOURCE') == None:
class SocketSender:
- def __init__(self, config):
- self.path = config.get('_SOCKET')
+ def __init__(self, settings):
+ self.path = settings.get('SESSION_SOCKET_PATH')
def send(self, tx):
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -118,9 +125,9 @@ def main():
sys.exit(1)
sender = None
- if config.get('_RPC_SEND'):
- if config.get('_SOCKET') != None:
- sender = SocketSender(config)
+ if config.true('_SOCKET_SEND'):
+ if settings.get('SESSION_SOCKET_PATH') != None:
+ sender = SocketSender(settings)
tx_iter = iter(processor)
out = Outputter(mode)
diff --git a/chaind/eth/runnable/syncer.py b/chaind/eth/runnable/syncer.py
@@ -13,8 +13,12 @@ from chainlib.eth.block import block_latest
from hexathon import strip_0x
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.driver.chain_interface import ChainInterfaceDriver
+from chainsyncer.error import SyncDone
+
+# local imports
from chaind.eth.settings import ChaindEthSettings
+
# local imports
from chaind.eth.cache import EthCacheTx
@@ -45,6 +49,7 @@ config = chainlib.cli.Config.from_args(args, arg_flags, base_config_dir=base_con
config = chainsyncer.cli.process_config(config, args, sync_flags)
config = chaind.cli.process_config(config, args, local_arg_flags)
config.add('eth', 'CHAIND_ENGINE', False)
+config.add('sync', 'CHAIND_COMPONENT', False)
logg.debug('config loaded:\n{}'.format(config))
settings = ChaindEthSettings(include_sync=True)
@@ -55,18 +60,21 @@ logg.debug('settings:\n{}'.format(settings))
def main():
queue_adapter = ChaindFsAdapter(
settings.get('CHAIN_SPEC'),
- settings.get('SESSION_DATA_DIR'),
+ settings.dir_for('queue'),
EthCacheTx,
None,
)
fltr = StateFilter(queue_adapter)
- sync_store = SyncFsStore(settings.get('SESSION_RUNTIME_DIR'), session_id=settings.get('SESSION_ID'))
+ sync_store = SyncFsStore(settings.get('SESSION_DATA_DIR'), session_id=settings.get('SESSION_ID'))
sync_store.register(fltr)
logg.debug('session block offset {}'.format(settings.get('SYNCER_OFFSET')))
drv = ChainInterfaceDriver(sync_store, settings.get('SYNCER_INTERFACE'), offset=settings.get('SYNCER_OFFSET'), target=settings.get('SYNCER_LIMIT'))
- drv.run(settings.get('RPC'))
+ try:
+ drv.run(settings.get('RPC'))
+ except SyncDone as e:
+ logg.info('sync done: {}'.format(e))
if __name__ == '__main__':
diff --git a/chaind/eth/runnable/tasker.py b/chaind/eth/runnable/tasker.py
@@ -1,134 +0,0 @@
-# standard imports
-import os
-import logging
-import signal
-
-# external imports
-import chainlib.eth.cli
-import chaind.cli
-from chaind.session import SessionController
-from chaind.setup import Environment
-from chaind.error import (
- NothingToDoError,
- ClientGoneError,
- ClientBlockError,
- ClientInputError,
- )
-from chainqueue import (
- Store,
- Status,
- )
-from chainqueue.error import DuplicateTxError
-from chainqueue.store.fs import (
- IndexStore,
- CounterStore,
- )
-from chainqueue.cache import CacheTokenTx
-from chainlib.encode import TxHexNormalizer
-from chainlib.chain import ChainSpec
-from chaind.adapters.fs import ChaindFsAdapter
-
-# local imports
-from chaind.eth.dispatch import EthDispatcher
-from chaind.eth.cache import EthCacheTx
-from chaind.eth.settings import ChaindEthSettings
-
-logging.basicConfig(level=logging.WARNING)
-logg = logging.getLogger()
-
-script_dir = os.path.dirname(os.path.realpath(__file__))
-config_dir = os.path.join(script_dir, '..', 'data', 'config')
-
-env = Environment(domain='eth', env=os.environ)
-
-arg_flags = chainlib.eth.cli.argflag_std_read
-argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
-
-local_arg_flags = chaind.cli.argflag_local_base | chaind.cli.ChaindFlag.DISPATCH | chaind.cli.ChaindFlag.SOCKET
-chaind.cli.process_flags(argparser, local_arg_flags)
-
-args = argparser.parse_args()
-
-base_config_dir = [chaind.cli.config_dir]
-config = chainlib.cli.Config.from_args(args, arg_flags, base_config_dir=base_config_dir)
-config = chaind.cli.process_config(config, args, local_arg_flags)
-config.add('eth', 'CHAIND_ENGINE', False)
-
-logg.debug('config loaded:\n{}'.format(config))
-
-settings = ChaindEthSettings(include_queue=True)
-settings.process(config)
-
-logg.debug('settings:\n{}'.format(settings))
-
-
-def process_outgoing(chain_spec, adapter, rpc, limit=100):
- upcoming = adapter.upcoming()
- logg.info('process {} {} {}'.format(chain_spec, adapter, rpc))
- logg.info('upcoming {}'.format(upcoming))
- i = 0
- for tx_hash in upcoming:
- if adapter.dispatch(tx_hash):
- i += 1
- return i
-
-chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
-
-rpc = chainlib.eth.cli.Rpc()
-conn = rpc.connect_by_config(config)
-
-tx_normalizer = TxHexNormalizer().tx_hash
-token_cache_store = CacheTokenTx(chain_spec, normalizer=tx_normalizer)
-dispatcher = EthDispatcher(conn)
-queue_adapter = ChaindFsAdapter(
- settings.get('CHAIN_SPEC'),
- settings.get('SESSION_DATA_DIR'),
- EthCacheTx,
- dispatcher,
- )
-
-ctrl = SessionController(settings, queue_adapter, process_outgoing)
-signal.signal(signal.SIGINT, ctrl.shutdown)
-signal.signal(signal.SIGTERM, ctrl.shutdown)
-
-logg.info('session id is ' + settings.get('SESSION_ID'))
-logg.info('session socket path is ' + settings.get('SESSION_SOCKET_PATH'))
-
-def main():
- while True:
- v = None
- client_socket = None
- try:
- (client_socket, v) = ctrl.get()
- except ClientGoneError:
- break
- except ClientBlockError:
- continue
- except ClientInputError:
- continue
- except NothingToDoError:
- pass
-
- if v == None:
- ctrl.process(conn)
- continue
-
- result_data = None
- r = 0 # no error
- try:
- result_data = queue_adapter.put(v.hex())
- except DuplicateTxError as e:
- logg.error('tx already exists: {}'.format(e))
- r = 1
- except ValueError as e:
- logg.error('adapter rejected input {}: "{}"'.format(v.hex(), e))
- continue
-
- if r == 0:
- queue_adapter.enqueue(result_data)
-
- ctrl.respond_put(client_socket, r, extra_data=result_data)
-
-
-if __name__ == '__main__':
- main()
diff --git a/setup.cfg b/setup.cfg
@@ -34,7 +34,7 @@ packages =
[options.entry_points]
console_scripts =
- chaind-eth-tasker = chaind.eth.runnable.tasker:main
+ chaind-eth-queuer = chaind.eth.runnable.queuer:main
chaind-eth-syncer = chaind.eth.runnable.syncer:main
chaind-eth-send = chaind.eth.runnable.send:main
#chaind-eth-resend = chaind_eth.runnable.resend:main