chaind-eth

Queue server for ethereum
Info | Log | Files | Refs | README | LICENSE

retry.py (4005B)


      1 # SPDX-License-Identifier: GPL-3.0-or-later
      2 
      3 # standard imports
      4 import os
      5 import logging
      6 import sys
      7 import datetime
      8 
      9 # external imports
     10 from hexathon import (
     11         add_0x,
     12         strip_0x,
     13         )
     14 from chaind import Environment
     15 import chainlib.eth.cli
     16 from chainlib.chain import ChainSpec
     17 from chainqueue.db import dsn_from_config
     18 from chainqueue.sql.backend import SQLBackend
     19 from chainqueue.enum import StatusBits
     20 from chaind.sql.session import SessionIndex
     21 from chainqueue.adapters.eth import EthAdapter
     22 from chainlib.eth.gas import price
     23 from chainlib.eth.connection import EthHTTPConnection
     24 from crypto_dev_signer.eth.transaction import EIP155Transaction
     25 
     26 DEFAULT_GAS_FACTOR = 1.1
     27 
     28 
     29 logging.basicConfig(level=logging.WARNING)
     30 logg = logging.getLogger()
     31 
     32 script_dir = os.path.dirname(os.path.realpath(__file__)) 
     33 config_dir = os.path.join(script_dir, '..', 'data', 'config')
     34 
     35 arg_flags = chainlib.eth.cli.argflag_std_write
     36 argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
     37 argparser.add_argument('--backend', type=str, default='sql', help='Backend to use (currently only "sql")')
     38 argparser.add_positional('session_id', required=False, type=str, help='Ethereum address of recipient')
     39 args = argparser.parse_args()
     40 extra_args = {
     41     'backend': None,
     42     'session_id': 'SESSION_ID',
     43         }
     44 
     45 env = Environment(domain='eth', env=os.environ)
     46 
     47 config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
     48 
     49 if config.get('SESSION_DATA_DIR') == None:
     50     config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True)
     51 
     52 chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
     53 
     54 tx_getter = None
     55 session_method = None
     56 if config.get('_BACKEND') == 'sql':
     57     from chainqueue.sql.query import get_tx_cache as tx_getter
     58     from chainqueue.runnable.sql import setup_backend
     59     from chainqueue.db.models.base import SessionBase
     60     setup_backend(config, debug=config.true('DATABASE_DEBUG'))
     61     session_method = SessionBase.create_session
     62 else:
     63     raise NotImplementedError('backend {} not implemented'.format(config.get('_BACKEND')))
     64 
     65 if config.get('DATABASE_ENGINE') == 'sqlite':
     66     config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True)
     67  
     68 wallet = chainlib.eth.cli.Wallet()
     69 wallet.from_config(config)
     70 
     71 rpc = chainlib.eth.cli.Rpc(wallet=wallet)
     72 conn = rpc.connect_by_config(config)
     73 
     74 dsn = dsn_from_config(config)
     75 backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG'), error_parser=rpc.error_parser)
     76 session_index_backend = SessionIndex(config.get('SESSION_ID'))
     77 adapter = EthAdapter(backend, session_index_backend=session_index_backend)
     78 
     79 
     80 def main():
     81     before = datetime.datetime.utcnow() - adapter.pending_retry_threshold
     82     txs = session_index_backend.get(chain_spec, adapter, status=StatusBits.IN_NETWORK, not_status=StatusBits.FINAL | StatusBits.OBSOLETE, before=before)
     83 
     84     o = price()
     85     r = conn.do(o, error_parser=rpc.error_parser)
     86     gas_price = strip_0x(r)
     87     try:
     88         gas_price = int(gas_price, 16)
     89     except ValueError:
     90         gas_price = int(gas_price)
     91     logg.info('got current gas price {}'.format(gas_price))
     92 
     93     signer = rpc.get_signer()
     94 
     95     db_session = adapter.create_session()
     96     for tx_hash in txs:
     97         tx_bytes = bytes.fromhex(strip_0x(txs[tx_hash]))
     98         tx = adapter.translate(tx_bytes, chain_spec)
     99         tx_gas_price = int(tx['gasPrice'])
    100         if tx_gas_price < gas_price:
    101             tx['gasPrice'] = gas_price
    102         else:
    103             tx['gasPrice'] = int(tx['gasPrice'] * DEFAULT_GAS_FACTOR)
    104         tx_obj = EIP155Transaction(tx, tx['nonce'], chain_spec.chain_id())
    105         new_tx_bytes = signer.sign_transaction_to_wire(tx_obj)
    106         logg.debug('add tx {} with gas price changed from {} to {}: {}'.format(tx_hash, tx_gas_price, tx['gasPrice'], new_tx_bytes.hex()))
    107         adapter.add(new_tx_bytes, chain_spec, session=db_session)
    108 
    109     db_session.close()
    110 
    111 
    112 if __name__ == '__main__':
    113     main()