commit 0c0d1fac8fc381d20282bce69800954e74e39b4f
parent fc69b2762bf8c9f4686f32340437d180bf8a4e70
Author: nolash <dev@holbrook.no>
Date: Thu, 26 Aug 2021 17:14:11 +0200
Implement error parser to rpc instantation
Diffstat:
5 files changed, 140 insertions(+), 41 deletions(-)
diff --git a/chaind_eth/runnable/retry.py b/chaind_eth/runnable/retry.py
@@ -0,0 +1,113 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+# standard imports
+import os
+import logging
+import sys
+import datetime
+
+# external imports
+from hexathon import (
+ add_0x,
+ strip_0x,
+ )
+from chaind import Environment
+import chainlib.eth.cli
+from chainlib.chain import ChainSpec
+from chainqueue.db import dsn_from_config
+from chainqueue.sql.backend import SQLBackend
+from chainqueue.enum import StatusBits
+from chaind.sql.session import SessionIndex
+from chainqueue.adapters.eth import EthAdapter
+from chainlib.eth.gas import price
+from chainlib.eth.connection import EthHTTPConnection
+from crypto_dev_signer.eth.transaction import EIP155Transaction
+
+DEFAULT_GAS_FACTOR = 1.1
+
+
+logging.basicConfig(level=logging.WARNING)
+logg = logging.getLogger()
+
+script_dir = os.path.dirname(os.path.realpath(__file__))
+config_dir = os.path.join(script_dir, '..', 'data', 'config')
+
+arg_flags = chainlib.eth.cli.argflag_std_write
+argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
+argparser.add_argument('--backend', type=str, default='sql', help='Backend to use (currently only "sql")')
+argparser.add_positional('session_id', required=False, type=str, help='Ethereum address of recipient')
+args = argparser.parse_args()
+extra_args = {
+ 'backend': None,
+ 'session_id': 'SESSION_ID',
+ }
+
+env = Environment(domain='eth', env=os.environ)
+
+config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
+
+if config.get('SESSION_DATA_DIR') == None:
+ config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True)
+
+chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
+
+tx_getter = None
+session_method = None
+if config.get('_BACKEND') == 'sql':
+ from chainqueue.sql.query import get_tx_cache as tx_getter
+ from chainqueue.runnable.sql import setup_backend
+ from chainqueue.db.models.base import SessionBase
+ setup_backend(config, debug=config.true('DATABASE_DEBUG'))
+ session_method = SessionBase.create_session
+else:
+ raise NotImplementedError('backend {} not implemented'.format(config.get('_BACKEND')))
+
+if config.get('DATABASE_ENGINE') == 'sqlite':
+ config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True)
+
+wallet = chainlib.eth.cli.Wallet()
+wallet.from_config(config)
+
+rpc = chainlib.eth.cli.Rpc(wallet=wallet)
+conn = rpc.connect_by_config(config)
+
+dsn = dsn_from_config(config)
+backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG'), error_parser=rpc.error_parser)
+session_index_backend = SessionIndex(config.get('SESSION_ID'))
+adapter = EthAdapter(backend, session_index_backend=session_index_backend)
+
+
+def main():
+ before = datetime.datetime.utcnow() - adapter.pending_retry_threshold
+ txs = session_index_backend.get(chain_spec, adapter, status=StatusBits.IN_NETWORK, not_status=StatusBits.FINAL | StatusBits.OBSOLETE, before=before)
+
+ o = price()
+ r = conn.do(o, error_parser=rpc.error_parser)
+ gas_price = strip_0x(r)
+ try:
+ gas_price = int(gas_price, 16)
+ except ValueError:
+ gas_price = int(gas_price)
+ logg.info('got current gas price {}'.format(gas_price))
+
+ signer = rpc.get_signer()
+
+ db_session = adapter.create_session()
+ for tx_hash in txs:
+ tx_bytes = bytes.fromhex(strip_0x(txs[tx_hash]))
+ tx = adapter.translate(tx_bytes, chain_spec)
+ tx_gas_price = int(tx['gasPrice'])
+ if tx_gas_price < gas_price:
+ tx['gasPrice'] = gas_price
+ else:
+ tx['gasPrice'] = int(tx['gasPrice'] * DEFAULT_GAS_FACTOR)
+ tx_obj = EIP155Transaction(tx, tx['nonce'], chain_spec.chain_id())
+ new_tx_bytes = signer.sign_transaction_to_wire(tx_obj)
+ logg.debug('add tx {} with gas price changed from {} to {}: {}'.format(tx_hash, tx_gas_price, tx['gasPrice'], new_tx_bytes.hex()))
+ adapter.add(new_tx_bytes, chain_spec, session=db_session)
+
+ db_session.close()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/chaind_eth/runnable/server.py b/chaind_eth/runnable/server.py
@@ -17,6 +17,7 @@ from chainlib.eth.connection import EthHTTPConnection
from chainqueue.sql.backend import SQLBackend
from chainlib.error import JSONRPCException
from chainqueue.db import dsn_from_config
+from chaind.sql.session import SessionIndex
# local imports
from chaind_eth.dispatch import Dispatcher
@@ -59,7 +60,7 @@ if not config.get('SESSION_SOCKET_PATH'):
if config.get('DATABASE_ENGINE') == 'sqlite':
#config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True)
- config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME')), 'DATABASE_NAME', exists_ok=True)
+ config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True)
config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded:\n{}'.format(config))
@@ -116,10 +117,14 @@ signal.signal(signal.SIGTERM, ctrl.shutdown)
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
+rpc = chainlib.eth.cli.Rpc()
+conn = rpc.connect_by_config(config)
+
+logg.debug('error {}'.format(rpc.error_parser))
dsn = dsn_from_config(config)
-backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG'))
-adapter = EthAdapter(backend)
-rpc = EthHTTPConnection(url=config.get('RPC_HTTP_PROVIDER'), chain_spec=chain_spec)
+backend = SQLBackend(dsn, error_parser=rpc.error_parser, debug=config.true('DATABASE_DEBUG'))
+session_index_backend = SessionIndex(config.get('SESSION_ID'))
+adapter = EthAdapter(backend, session_index_backend=session_index_backend)
def process_outgoing(chain_spec, adapter, rpc):
@@ -148,7 +153,7 @@ def main():
break
if srvs == None:
logg.debug('timeout (remote socket is none)')
- r = process_outgoing(chain_spec, adapter, rpc)
+ r = process_outgoing(chain_spec, adapter, conn)
if r > 0:
ctrl.srv.settimeout(0.1)
else:
@@ -174,15 +179,21 @@ def main():
logg.debug('recv {} bytes'.format(len(data)))
session = backend.create_session()
+ tx_hash = None
try:
- r = adapter.add(data, chain_spec, session=session)
- try:
- r = srvs.send(r.to_bytes(4, byteorder='big'))
- logg.debug('{} bytes sent'.format(r))
- except BrokenPipeError:
- logg.debug('they just hung up. how rude.')
+ tx_hash = adapter.add(data, chain_spec, session=session)
except ValueError as e:
logg.error('invalid input: {}'.format(e))
+
+ r = 1
+ if tx_hash != None:
+ session.commit()
+ r = 0
+ try:
+ r = srvs.send(r.to_bytes(4, byteorder='big'))
+ logg.debug('{} bytes sent'.format(r))
+ except BrokenPipeError:
+ logg.debug('they just hung up. how rude.')
session.close()
srvs.close()
diff --git a/chainqueue/adapters/eth.py b/chainqueue/adapters/eth.py
@@ -1,6 +1,5 @@
# standard imports
import logging
-import datetime
# external imports
from chainlib.eth.constant import ZERO_ADDRESS
@@ -12,18 +11,15 @@ from hexathon import (
add_0x,
strip_0x,
)
-from chainqueue.enum import (
- StatusBits,
- errors as queue_errors,
- )
# local imports
-from chainqueue.adapters.base import Adapter
+from chainqueue.adapters.sessionindex import SessionIndexAdapter
logg = logging.getLogger(__name__)
-class EthAdapter(Adapter):
+class EthAdapter(SessionIndexAdapter):
+
def translate(self, bytecode, chain_spec):
logg.debug('bytecode {}'.format(bytecode))
@@ -41,27 +37,6 @@ class EthAdapter(Adapter):
return r
- def upcoming(self, chain_spec, session=None):
- txs = self.backend.get(chain_spec, self.translate, session=session, status=StatusBits.QUEUED, not_status=StatusBits.IN_NETWORK)
- before = datetime.datetime.utcnow() - self.error_retry_threshold
- errored_txs = self.backend.get(chain_spec, self.translate, session=session, status=StatusBits.LOCAL_ERROR, not_status=StatusBits.FINAL, before=before, requeue=True)
- for tx_hash in errored_txs.keys():
- txs[tx_hash] = errored_txs[tx_hash]
- return txs
-
-
- def add(self, bytecode, chain_spec, session=None):
- tx = self.translate(bytecode, chain_spec)
- r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session)
- if r:
- session.rollback()
- session.close()
- return r
- r = self.backend.cache(tx, session=session)
- session.commit()
- return r
-
-
# def cache(self, chain_spec):
# session = self.backend.create_session()
# r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session)
diff --git a/requirements.txt b/requirements.txt
@@ -1,3 +1,3 @@
-chaind<=0.0.2,>=0.0.2a3
+chaind<=0.0.3,>=0.0.3a1
hexathon~=0.0.1a8
chainlib-eth<=0.1.0,>=0.0.9a4
diff --git a/setup.cfg b/setup.cfg
@@ -1,6 +1,6 @@
[metadata]
name = chaind-eth
-version = 0.0.2a1
+version = 0.0.3a1
description = Queue server for ethereum
author = Louis Holbrook
author_email = dev@holbrook.no