commit c33535379780a3e28d20a17973da5848aab98aa8
parent d5afd6fde454e474698bbd79e35459c1feb4d067
Author: nolash <dev@holbrook.no>
Date: Wed, 8 Sep 2021 17:17:49 +0200
Add resend script
Diffstat:
8 files changed, 255 insertions(+), 117 deletions(-)
diff --git a/chaind_eth/cli/csv.py b/chaind_eth/cli/csv.py
@@ -16,6 +16,8 @@ class CSVProcessor:
import csv # only import if needed
fr = csv.reader(f)
+ f.close()
+
for r in fr:
contents.append(r)
l = len(contents)
diff --git a/chaind_eth/cli/output.py b/chaind_eth/cli/output.py
@@ -0,0 +1,34 @@
+# standard imports
+import logging
+import socket
+import enum
+
+logg = logging.getLogger(__name__)
+
+
+class OpMode(enum.Enum):
+ STDOUT = 'standard_output'
+ UNIX = 'unix_socket'
+
+class Outputter:
+
+ def __init__(self, mode):
+ self.out = getattr(self, 'do_' + mode.value)
+
+
+ def do(self, hx, *args, **kwargs):
+ return self.out(hx, *args, **kwargs)
+
+
+ def do_standard_output(self, hx, *args, **kwargs):
+ return hx
+
+
+ def do_unix_socket(self, hx, *args, **kwargs):
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ s.connect(kwargs['socket'])
+ s.send(hx.encode('utf-8'))
+ r = s.recv(64+4)
+ logg.debug('r {}'.format(r))
+ s.close()
+ return r[4:].decode('utf-8')
diff --git a/chaind_eth/cli/process.py b/chaind_eth/cli/process.py
@@ -45,10 +45,10 @@ class Processor:
self.content = processor.load(self.source)
if self.content != None:
if process:
- #try:
- self.process()
- #except Exception as e:
- # raise TxSourceError('invalid source contents: {}'.format(str(e)))
+ try:
+ self.process()
+ except Exception as e:
+ raise TxSourceError('invalid source contents: {}'.format(str(e)))
return self.content
raise TxSourceError('unparseable source')
diff --git a/chaind_eth/cli/retry.py b/chaind_eth/cli/retry.py
@@ -0,0 +1,81 @@
+# standard imports
+import logging
+
+# external imports
+from chainlib.eth.gas import price
+from chainlib.eth.tx import unpack
+from chaind.error import TxSourceError
+from crypto_dev_signer.eth.transaction import EIP155Transaction
+from chainlib.eth.gas import Gas
+from hexathon import (
+ add_0x,
+ strip_0x,
+ )
+
+# local imports
+from chaind_eth.cli.tx import TxProcessor
+
+logg = logging.getLogger(__name__)
+
+DEFAULT_GAS_FACTOR = 1.1
+
+
+class Retrier:
+
+ def __init__(self, sender, signer, source, chain_spec, gas_oracle, gas_factor=DEFAULT_GAS_FACTOR):
+ self.sender = sender
+ self.signer = signer
+ self.source = source
+ self.raw_content = []
+ self.content = []
+ self.cursor = 0
+ self.gas_oracle = gas_oracle
+ self.gas_factor = gas_factor
+ self.chain_spec = chain_spec
+ self.chain_id = chain_spec.chain_id()
+ self.processor = [TxProcessor()]
+
+
+ def load(self, process=True):
+ for processor in self.processor:
+ self.raw_content = processor.load(self.source)
+ if self.raw_content != None:
+ if process:
+ #try:
+ self.process()
+ #except Exception as e:
+ # raise TxSourceError('invalid source contents: {}'.format(str(e)))
+ return self.content
+ raise TxSourceError('unparseable source')
+
+
+ def process(self):
+ gas_data = self.gas_oracle.get_gas()
+ gas_price = gas_data[0]
+ for tx in self.raw_content:
+ tx_bytes = bytes.fromhex(strip_0x(tx))
+ tx = unpack(tx_bytes, self.chain_spec)
+ tx_gas_price_old = int(tx['gasPrice'])
+ if tx_gas_price_old < gas_price:
+ tx['gasPrice'] = gas_price
+ else:
+ tx['gasPrice'] = int(tx_gas_price_old * self.gas_factor)
+ if tx_gas_price_old == tx['gasPrice']:
+ tx['gasPrice'] += 1
+ tx_obj = EIP155Transaction(tx, tx['nonce'], self.chain_id)
+ new_tx_bytes = self.signer.sign_transaction_to_wire(tx_obj)
+ logg.debug('add tx {} with gas price changed from {} to {}: {}'.format(tx['hash'], tx_gas_price_old, tx['gasPrice'], new_tx_bytes.hex()))
+ self.content.append(new_tx_bytes)
+
+
+ def __iter__(self):
+ self.cursor = 0
+ return self
+
+
+ def __next__(self):
+ if self.cursor == len(self.content):
+ raise StopIteration()
+ tx = self.content[self.cursor]
+ self.cursor += 1
+ return tx
diff --git a/chaind_eth/cli/tx.py b/chaind_eth/cli/tx.py
@@ -0,0 +1,23 @@
+# standard imports
+import logging
+
+logg = logging.getLogger(__name__)
+
+class TxProcessor:
+
+ def load(self, s):
+ contents = []
+ f = None
+ try:
+ f = open(s, 'r')
+ except FileNotFoundError:
+ return None
+
+ contents = f.readlines()
+ f.close()
+ for i in range(len(contents)):
+ contents[i] = contents[i].rstrip()
+ return contents
+
+ def __str__(self):
+ return 'tx processor'
diff --git a/chaind_eth/runnable/resend.py b/chaind_eth/runnable/resend.py
@@ -0,0 +1,110 @@
+# standard imports
+import os
+import logging
+import sys
+import datetime
+import enum
+import re
+import stat
+
+# external imports
+import chainlib.eth.cli
+from chaind import Environment
+from chainlib.eth.gas import price
+from chainlib.chain import ChainSpec
+from hexathon import strip_0x
+from eth_token_index.index import TokenUniqueSymbolIndex
+
+# local imports
+from chaind_eth.cli.retry import Retrier
+from chaind.error import TxSourceError
+from chaind_eth.cli.output import (
+ Outputter,
+ OpMode,
+ )
+
+logging.basicConfig(level=logging.WARNING)
+logg = logging.getLogger()
+
+script_dir = os.path.dirname(os.path.realpath(__file__))
+config_dir = os.path.join(script_dir, '..', 'data', 'config')
+
+
+arg_flags = chainlib.eth.cli.argflag_std_write
+argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
+argparser.add_argument('--socket', dest='socket', type=str, help='Socket to send transactions to')
+argparser.add_argument('--token-index', dest='token_index', type=str, help='Token resolver index')
+argparser.add_positional('source', required=False, type=str, help='Transaction source file')
+args = argparser.parse_args()
+
+extra_args = {
+ 'socket': None,
+ 'source': None,
+ 'token_index': None,
+ }
+
+env = Environment(domain='eth', env=os.environ)
+config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
+
+wallet = chainlib.eth.cli.Wallet()
+wallet.from_config(config)
+
+rpc = chainlib.eth.cli.Rpc(wallet=wallet)
+conn = rpc.connect_by_config(config)
+
+chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
+
+
+mode = OpMode.STDOUT
+re_unix = r'^ipc://(/.+)'
+m = re.match(re_unix, config.get('_SOCKET', ''))
+if m != None:
+ config.add(m.group(1), '_SOCKET', exists_ok=True)
+ r = 0
+ try:
+ stat_info = os.stat(config.get('_SOCKET'))
+ if not stat.S_ISSOCK(stat_info.st_mode):
+ r = 1
+ except FileNotFoundError:
+ r = 1
+
+ if r > 0:
+ sys.stderr.write('{} is not a socket\n'.format(config.get('_SOCKET')))
+ sys.exit(1)
+
+ mode = OpMode.UNIX
+
+logg.info('using mode {}'.format(mode.value))
+
+if config.get('_SOURCE') == None:
+ sys.stderr.write('source data missing')
+ sys.exit(1)
+
+
+def main():
+ signer = rpc.get_signer()
+
+ # TODO: make resolvers pluggable
+ processor = Retrier(wallet.get_signer_address(), wallet.get_signer(), config.get('_SOURCE'), chain_spec, rpc.get_gas_oracle())
+
+ sends = None
+ try:
+ sends = processor.load()
+ except TxSourceError as e:
+ sys.stderr.write('processing error: {}. processors: {}\n'.format(str(e), str(processor)))
+ sys.exit(1)
+
+ tx_iter = iter(processor)
+ out = Outputter(mode)
+ while True:
+ tx = None
+ try:
+ tx_bytes = next(tx_iter)
+ except StopIteration:
+ break
+ tx_hex = tx_bytes.hex()
+ print(out.do(tx_hex, socket=config.get('_SOCKET')))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/chaind_eth/runnable/retry.py b/chaind_eth/runnable/retry.py
@@ -1,113 +0,0 @@
-# SPDX-License-Identifier: GPL-3.0-or-later
-
-# standard imports
-import os
-import logging
-import sys
-import datetime
-
-# external imports
-from hexathon import (
- add_0x,
- strip_0x,
- )
-from chaind import Environment
-import chainlib.eth.cli
-from chainlib.chain import ChainSpec
-from chainqueue.db import dsn_from_config
-from chainqueue.sql.backend import SQLBackend
-from chainqueue.enum import StatusBits
-from chaind.sql.session import SessionIndex
-from chainqueue.adapters.eth import EthAdapter
-from chainlib.eth.gas import price
-from chainlib.eth.connection import EthHTTPConnection
-from crypto_dev_signer.eth.transaction import EIP155Transaction
-
-DEFAULT_GAS_FACTOR = 1.1
-
-
-logging.basicConfig(level=logging.WARNING)
-logg = logging.getLogger()
-
-script_dir = os.path.dirname(os.path.realpath(__file__))
-config_dir = os.path.join(script_dir, '..', 'data', 'config')
-
-arg_flags = chainlib.eth.cli.argflag_std_write
-argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
-argparser.add_argument('--backend', type=str, default='sql', help='Backend to use (currently only "sql")')
-argparser.add_positional('session_id', required=False, type=str, help='Session id to connect to')
-args = argparser.parse_args()
-extra_args = {
- 'backend': None,
- 'session_id': 'SESSION_ID',
- }
-
-env = Environment(domain='eth', env=os.environ)
-
-config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
-
-if config.get('SESSION_DATA_DIR') == None:
- config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True)
-
-chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
-
-tx_getter = None
-session_method = None
-if config.get('_BACKEND') == 'sql':
- from chainqueue.sql.query import get_tx_cache as tx_getter
- from chainqueue.runnable.sql import setup_backend
- from chainqueue.db.models.base import SessionBase
- setup_backend(config, debug=config.true('DATABASE_DEBUG'))
- session_method = SessionBase.create_session
-else:
- raise NotImplementedError('backend {} not implemented'.format(config.get('_BACKEND')))
-
-if config.get('DATABASE_ENGINE') == 'sqlite':
- config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True)
-
-wallet = chainlib.eth.cli.Wallet()
-wallet.from_config(config)
-
-rpc = chainlib.eth.cli.Rpc(wallet=wallet)
-conn = rpc.connect_by_config(config)
-
-dsn = dsn_from_config(config)
-backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG'), error_parser=rpc.error_parser)
-session_index_backend = SessionIndex(config.get('SESSION_ID'))
-adapter = EthAdapter(backend, session_index_backend=session_index_backend)
-
-
-def main():
- before = datetime.datetime.utcnow() - adapter.pending_retry_threshold
- txs = session_index_backend.get(chain_spec, adapter, status=StatusBits.IN_NETWORK, not_status=StatusBits.FINAL | StatusBits.OBSOLETE, before=before)
-
- o = price()
- r = conn.do(o, error_parser=rpc.error_parser)
- gas_price = strip_0x(r)
- try:
- gas_price = int(gas_price, 16)
- except ValueError:
- gas_price = int(gas_price)
- logg.info('got current gas price {}'.format(gas_price))
-
- signer = rpc.get_signer()
-
- db_session = adapter.create_session()
- for tx_hash in txs:
- tx_bytes = bytes.fromhex(strip_0x(txs[tx_hash]))
- tx = adapter.translate(tx_bytes, chain_spec)
- tx_gas_price = int(tx['gasPrice'])
- if tx_gas_price < gas_price:
- tx['gasPrice'] = gas_price
- else:
- tx['gasPrice'] = int(tx['gasPrice'] * DEFAULT_GAS_FACTOR)
- tx_obj = EIP155Transaction(tx, tx['nonce'], chain_spec.chain_id())
- new_tx_bytes = signer.sign_transaction_to_wire(tx_obj)
- logg.debug('add tx {} with gas price changed from {} to {}: {}'.format(tx_hash, tx_gas_price, tx['gasPrice'], new_tx_bytes.hex()))
- adapter.add(new_tx_bytes, chain_spec, session=db_session)
-
- db_session.close()
-
-
-if __name__ == '__main__':
- main()
diff --git a/setup.cfg b/setup.cfg
@@ -38,3 +38,4 @@ console_scripts =
chaind-eth-server = chaind_eth.runnable.server:main
chaind-eth-syncer = chaind_eth.runnable.syncer:main
chaind-eth-send = chaind_eth.runnable.send:main
+ chaind-eth-resend = chaind_eth.runnable.resend:main