commit 2a7fd70f4e6374e3fd171f0f0ebc7257504e2e09
parent dab50dadd1a2c5ae1d972c8b39a8b44524c07e90
Author: lash <>
Date: Sun, 10 Apr 2022 19:03:23 +0000
Add send cli tool, make token resolver pluggable
24 files changed, 331 insertions(+), 1111 deletions(-)
diff --git a/chaind_eth/cli/ b/chaind/eth/cli/
diff --git a/chaind_eth/cli/ b/chaind/eth/cli/
diff --git a/chaind/eth/runnable/ b/chaind/eth/runnable/
@@ -0,0 +1,119 @@
+# standard imports
+import os
+import logging
+import sys
+import datetime
+import enum
+import re
+import stat
+import socket
+# external imports
+import chainlib.eth.cli
+from chaind.setup import Environment
+from chainlib.eth.gas import price
+from chainlib.chain import ChainSpec
+from hexathon import strip_0x
+# local imports
+from chaind.error import TxSourceError
+from chaind.eth.token.process import Processor
+from chaind.eth.token.gas import GasTokenResolver
+from chaind.eth.cli.csv import CSVProcessor
+from chaind.eth.cli.output import (
+ Outputter,
+ OpMode,
+ )
+logg = logging.getLogger()
+script_dir = os.path.dirname(os.path.realpath(__file__))
+config_dir = os.path.join(script_dir, '..', 'data', 'config')
+arg_flags = chainlib.eth.cli.argflag_std_write
+argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
+argparser.add_argument('--socket', dest='socket', type=str, help='Socket to send transactions to')
+argparser.add_argument('--token-module', dest='token_module', type=str, help='Python module path to resolve tokens from identifiers')
+argparser.add_positional('source', required=False, type=str, help='Transaction source file')
+args = argparser.parse_args()
+extra_args = {
+ 'socket': None,
+ 'source': None,
+ }
+env = Environment(domain='eth', env=os.environ)
+config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
+config.add(args.token_module, 'TOKEN_MODULE', True)
+wallet = chainlib.eth.cli.Wallet()
+rpc = chainlib.eth.cli.Rpc(wallet=wallet)
+conn = rpc.connect_by_config(config)
+chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
+mode = OpMode.STDOUT
+re_unix = r'^ipc://(/.+)'
+m = re.match(re_unix, config.get('_SOCKET', ''))
+if m != None:
+ config.add(, '_SOCKET', exists_ok=True)
+ r = 0
+ try:
+ stat_info = os.stat(config.get('_SOCKET'))
+ if not stat.S_ISSOCK(stat_info.st_mode):
+ r = 1
+ except FileNotFoundError:
+ r = 1
+ if r > 0:
+ sys.stderr.write('{} is not a socket\n'.format(config.get('_SOCKET')))
+ sys.exit(1)
+ mode = OpMode.UNIX
+'using mode {}'.format(mode.value))
+if config.get('_SOURCE') == None:
+ sys.stderr.write('source data missing\n')
+ sys.exit(1)
+def main():
+ token_resolver = None
+ if config.get('TOKEN_MODULE') != None:
+ import importlib
+ m = importlib.import_module(config.get('TOKEN_MODULE'))
+ m = m.TokenResolver
+ else:
+ from chaind.eth.token.gas import GasTokenResolver
+ m = GasTokenResolver
+ token_resolver = m(chain_spec, rpc.get_sender_address(), rpc.get_signer(), rpc.get_gas_oracle(), rpc.get_nonce_oracle())
+ processor = Processor(token_resolver, config.get('_SOURCE'))
+ processor.add_processor(CSVProcessor())
+ sends = None
+ try:
+ sends = processor.load()
+ except TxSourceError as e:
+ sys.stderr.write('processing error: {}. processors: {}\n'.format(str(e), str(processor)))
+ sys.exit(1)
+ tx_iter = iter(processor)
+ out = Outputter(mode)
+ while True:
+ tx = None
+ try:
+ tx_bytes = next(tx_iter)
+ except StopIteration:
+ break
+ tx_hex = tx_bytes.hex()
+ print(
+if __name__ == '__main__':
+ main()
diff --git a/chaind/eth/token/ b/chaind/eth/token/
@@ -0,0 +1 @@
+from .base import *
diff --git a/chaind/eth/token/ b/chaind/eth/token/
@@ -0,0 +1,52 @@
+# standard imports
+import logging
+# external imports
+from funga.eth.transaction import EIP155Transaction
+from hexathon import strip_0x
+logg = logging.getLogger(__name__)
+class BaseTokenResolver:
+ def __init__(self, chain_spec, sender, signer, gas_oracle, nonce_oracle):
+ self.chain_spec = chain_spec
+ self.chain_id = chain_spec.chain_id()
+ self.signer = signer
+ self.sender = sender
+ self.gas_oracle = gas_oracle
+ self.nonce_oracle = nonce_oracle
+ self.factory = None
+ self.gas_limit_start = None
+ self.gas_price_start = None
+ def reset(self):
+ gas_data = self.gas_oracle.get_gas()
+ self.gas_price_start = gas_data[0]
+ self.gas_limit_start = gas_data[1]
+ def get_values(self, gas_value, value, executable_address=None):
+ if executable_address == None:
+ return (value, 0)
+ try:
+ value = int(value)
+ except ValueError:
+ value = int(strip_0x(value), 16)
+ try:
+ gas_value = int(gas_value)
+ except ValueError:
+ gas_value = int(strip_0x(gas_value), 16)
+ nonce = self.nonce_oracle.next_nonce()
+ return (gas_value, value, nonce,)
+ def sign(self, tx):
+ tx_o = EIP155Transaction(tx, tx['nonce'], self.chain_id)
+ tx_bytes = self.signer.sign_transaction_to_wire(tx_o)
+ return tx_bytes
diff --git a/chaind/eth/token/ b/chaind/eth/token/
@@ -0,0 +1,26 @@
+# external imports
+from eth_erc20 import ERC20
+from chainlib.eth.tx import TxFormat
+# local imports
+from chaind.eth.token import BaseTokenResolver
+class TokenResolver(BaseTokenResolver):
+ def __init__(self, chain_spec, sender, signer, gas_oracle, nonce_oracle):
+ super(TokenResolver, self).__init__(chain_spec, sender, signer, gas_oracle, nonce_oracle)
+ self.factory = ERC20(self.chain_spec, signer=self.signer, gas_oracle=self.gas_oracle, nonce_oracle=self.nonce_oracle)
+ def create(self, recipient, gas_value, data=None, token_value=0, executable_address=None, passphrase=None):
+ if executable_address == None:
+ raise ValueError('executable address required')
+ (gas_value, token_value, nonce) = self.get_values(gas_value, token_value, executable_address=executable_address)
+ tx = self.factory.transfer(executable_address, self.sender, recipient, token_value, tx_format=TxFormat.DICT)
+ tx['value'] = gas_value
+ return tx
diff --git a/chaind/eth/token/ b/chaind/eth/token/
@@ -0,0 +1,30 @@
+# external imports
+from chainlib.eth.gas import Gas
+from hexathon import strip_0x
+# local imports
+from chaind.eth.token import BaseTokenResolver
+class GasTokenResolver(BaseTokenResolver):
+ def __init__(self, chain_spec, sender, signer, gas_oracle, nonce_oracle):
+ super(GasTokenResolver, self).__init__(chain_spec, sender, signer, gas_oracle, nonce_oracle)
+ self.factory = Gas(self.chain_spec, signer=self.signer, gas_oracle=self.gas_oracle, nonce_oracle=self.nonce_oracle)
+ def create(self, recipient, gas_value, data=None, token_value=0, executable_address=None, passphrase=None):
+ (gas_value, token_value, nonce) = self.get_values(gas_value, token_value, executable_address=executable_address)
+ tx = {
+ 'from': self.sender,
+ 'to': recipient,
+ 'value': gas_value,
+ 'data': data,
+ 'nonce': nonce,
+ 'gasPrice': self.gas_price_start,
+ 'gas': self.gas_limit_start,
+ }
+ return tx
diff --git a/chaind/eth/token/ b/chaind/eth/token/
@@ -0,0 +1,98 @@
+# standard imports
+import logging
+# external imports
+from chaind.error import TxSourceError
+from chainlib.eth.address import is_checksum_address
+from chainlib.eth.tx import unpack
+from chainlib.eth.gas import Gas
+from hexathon import (
+ add_0x,
+ strip_0x,
+ )
+from funga.eth.transaction import EIP155Transaction
+#from eth_erc20 import ERC20
+logg = logging.getLogger(__name__)
+class Processor:
+ def __init__(self, resolver, source):
+ self.resolver = resolver
+ self.source = source
+ self.processor = []
+ def add_processor(self, processor):
+ self.processor.append(processor)
+ def load(self, process=True):
+ for processor in self.processor:
+ self.content = processor.load(self.source)
+ if self.content != None:
+ if process:
+ try:
+ self.process()
+ except Exception as e:
+ raise TxSourceError('invalid source contents: {}'.format(str(e)))
+ return self.content
+ raise TxSourceError('unparseable source')
+ # 0: recipient
+ # 1: amount
+ # 2: token identifier (optional, when not specified network gas token will be used)
+ # 3: gas amount (optional)
+ def process(self):
+ txs = []
+ for i, r in enumerate(self.content):
+ logg.debug('processing {}'.format(r))
+ if not is_checksum_address(r[0]):
+ raise ValueError('invalid checksum address {} in record {}'.format(r[0], i))
+ self.content[i][0] = add_0x(r[0])
+ try:
+ self.content[i][1] = int(r[1])
+ except ValueError:
+ self.content[i][1] = int(strip_0x(r[1]), 16)
+ native_token_value = 0
+ if len(self.content[i]) == 3:
+ self.content[i].append(native_token_value)
+ def __iter__(self):
+ self.resolver.reset()
+ self.cursor = 0
+ return self
+ def __next__(self):
+ if self.cursor == len(self.content):
+ raise StopIteration()
+ r = self.content[self.cursor]
+ value = r[1]
+ gas_value = 0
+ try:
+ gas_value = r[3]
+ except IndexError:
+ pass
+ logg.debug('gasvalue {}'.format(gas_value))
+ data = '0x'
+ tx = self.resolver.create(r[0], gas_value, data=data, token_value=value, executable_address=r[2])
+ v = self.resolver.sign(tx)
+ self.cursor += 1
+ return v
+ def __str__(self):
+ names = []
+ for s in self.processor:
+ names.append(str(s))
+ return ','.join(names)
diff --git a/chaind_eth/ b/chaind_eth/
@@ -1,19 +0,0 @@
-# external imports
-from chainlib.interface import ChainInterface
-from chainlib.eth.block import (
- block_by_number,
- Block,
- )
-from chainlib.eth.tx import (
- receipt,
- Tx,
- )
-class EthChainInterface(ChainInterface):
- def __init__(self):
- self._block_by_number = block_by_number
- self._block_from_src = Block.from_src
- self._tx_receipt = receipt
- self._src_normalize = Tx.src_normalize
diff --git a/chaind_eth/cli/ b/chaind_eth/cli/
@@ -1,162 +0,0 @@
-# standard imports
-import logging
-# external imports
-from chaind.error import TxSourceError
-from chainlib.eth.address import is_checksum_address
-from chainlib.eth.tx import unpack
-from chainlib.eth.gas import Gas
-from hexathon import (
- add_0x,
- strip_0x,
- )
-from crypto_dev_signer.eth.transaction import EIP155Transaction
-from eth_erc20 import ERC20
-logg = logging.getLogger(__name__)
-class Processor:
- def __init__(self, sender, signer, source, chain_spec, gas_oracle, nonce_oracle, resolver=None):
- self.sender = sender
- self.signer = signer
- self.source = source
- self.processor = []
- self.content = []
- self.token = []
- self.token_resolver = resolver
- self.cursor = 0
- self.gas_oracle = gas_oracle
- self.nonce_oracle = nonce_oracle
- self.nonce_start = None
- self.gas_limit_start = None
- self.gas_price_start = None
- self.chain_spec = chain_spec
- self.chain_id = chain_spec.chain_id()
- def add_processor(self, processor):
- self.processor.append(processor)
- def load(self, process=True):
- for processor in self.processor:
- self.content = processor.load(self.source)
- if self.content != None:
- if process:
- try:
- self.process()
- except Exception as e:
- raise TxSourceError('invalid source contents: {}'.format(str(e)))
- return self.content
- raise TxSourceError('unparseable source')
- # 0: recipient
- # 1: amount
- # 2: token identifier (optional, when not specified network gas token will be used)
- # 3: gas amount (optional)
- def process(self):
- txs = []
- for i, r in enumerate(self.content):
- logg.debug('processing {}'.format(r))
- if not is_checksum_address(r[0]):
- raise ValueError('invalid checksum address {} in record {}'.format(r[0], i))
- self.content[i][0] = add_0x(r[0])
- try:
- self.content[i][1] = int(r[1])
- except ValueError:
- self.content[i][1] = int(strip_0x(r[1]), 16)
- native_token_value = 0
- if self.token_resolver == None:
- self.token.append(None)
- else:
- #self.content[i][2] = self.token_resolver.lookup(k)
- token = self.token_resolver.lookup(r[2])
- self.token.append(token)
- if len(self.content[i]) == 3:
- self.content[i].append(native_token_value)
- def __iter__(self):
- gas_data = self.gas_oracle.get_gas()
- self.gas_price_start = gas_data[0]
- self.gas_limit_start = gas_data[1]
- self.cursor = 0
- return self
- def __next__(self):
- if self.cursor == len(self.content):
- raise StopIteration()
- nonce = self.nonce_oracle.next_nonce()
- token_factory = None
- r = self.content[self.cursor]
- token = self.token[self.cursor]
- if token == None:
- token_factory = Gas(self.chain_spec, signer=self.signer, gas_oracle=self.gas_oracle, nonce_oracle=self.nonce_oracle)
- else:
- token_factory = ERC20(self.chain_spec, signer=self.signer, gas_oracle=self.gas_oracle, nonce_oracle=self.nonce_oracle)
- value = 0
- gas_value = 0
- data = '0x'
- debug_destination = (r[2], token)
- if debug_destination[1] == None:
- debug_destination = (None, 'network gas token')
- if isinstance(token_factory, ERC20):
- (tx_hash_hex, o) = token_factory.transfer(token, self.sender, r[0], r[1])
- logg.debug('tx {}'.format(o))
- # TODO: allow chainlib to return data args only (TxFormat)
- tx = unpack(bytes.fromhex(strip_0x(o['params'][0])), self.chain_spec)
- data = tx['data']
- try:
- value = int(r[1])
- except ValueError:
- value = int(strip_0x(r[1]), 16)
- try:
- gas_value = int(r[3])
- except:
- gas_value = int(strip_0x(r[3]), 16)
- else:
- try:
- value = int(r[1])
- except ValueError:
- value = int(strip_0x(r[1]), 16)
- gas_value = value
- logg.debug('token factory {} resolved sender {} recipient {} gas value {} token value {} token {}'.format(
- str(token_factory),
- self.sender,
- r[0],
- gas_value,
- value,
- debug_destination,
- )
- )
- tx = {
- 'from': self.sender,
- 'to': r[0],
- 'value': gas_value,
- 'data': data,
- 'nonce': nonce,
- 'gasPrice': self.gas_price_start,
- 'gas': self.gas_limit_start,
- }
- tx_o = EIP155Transaction(tx, nonce, self.chain_id)
- tx_bytes = self.signer.sign_transaction_to_wire(tx_o)
- self.cursor += 1
- return tx_bytes
- def __str__(self):
- names = []
- for s in self.processor:
- names.append(str(s))
- return ','.join(names)
diff --git a/chaind_eth/cli/ b/chaind_eth/cli/
@@ -1,86 +0,0 @@
-# standard imports
-import logging
-# external imports
-from chainlib.eth.constant import ZERO_ADDRESS
-from chainlib.eth.address import is_checksum_address
-from hexathon import strip_0x
-from eth_token_index.index import TokenUniqueSymbolIndex
-logg = logging.getLogger(__name__)
-class LookNoop:
- def __init__(self, check=True):
- self.check = check
- def get(self, k, rpc=None):
- if not self.check:
- address_bytes = bytes.fromhex(strip_0x(k))
- if len(address_bytes) != 20:
- raise ValueError('{} is not a valid address'.format(k))
- else:
- try:
- if not is_checksum_address(k):
- raise ValueError('not valid checksum address {}'.format(k))
- except ValueError:
- raise ValueError('not valid checksum address {}'.format(k))
- return strip_0x(k)
- def __str__(self):
- return 'checksum address shortcircuit'
-class TokenIndexLookup(TokenUniqueSymbolIndex):
- def __init__(self, chain_spec, signer, gas_oracle, nonce_oracle, address, sender_address=ZERO_ADDRESS):
- super(TokenIndexLookup, self).__init__(chain_spec, signer=signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
- self.local_address = address
- self.sender_address = sender_address
- def get(self, k, rpc=None):
- o = self.address_of(self.local_address, k, sender_address=self.sender_address)
- r =
- address = self.parse_address_of(r)
- if address != ZERO_ADDRESS:
- return address
- raise FileNotFoundError(address)
- def __str__(self):
- return 'token symbol index'
-class DefaultResolver:
- def __init__(self, chain_spec, rpc, sender_address=ZERO_ADDRESS):
- self.chain_spec = chain_spec
- self.rpc = rpc
- self.lookups = []
- self.lookup_pointers = []
- self.cursor = 0
- self.sender_address = sender_address
- def add_lookup(self, lookup, reverse):
- self.lookups.append(lookup)
- self.lookup_pointers.append(reverse)
- def lookup(self, k):
- if k == '' or k == None:
- return None
- for lookup in self.lookups:
- try:
- address = lookup.get(k, rpc=self.rpc)
- logg.debug('resolved token {} to {} with lookup {}'.format(k, address, lookup))
- return address
- except Exception as e:
- logg.debug('lookup {} failed for {}: {}'.format(lookup, k, e))
- raise FileNotFoundError(k)
diff --git a/chaind_eth/cli/ b/chaind_eth/cli/
@@ -1,81 +0,0 @@
-# standard imports
-import logging
-# external imports
-from chainlib.eth.gas import price
-from chainlib.eth.tx import unpack
-from chaind.error import TxSourceError
-from crypto_dev_signer.eth.transaction import EIP155Transaction
-from chainlib.eth.gas import Gas
-from hexathon import (
- add_0x,
- strip_0x,
- )
-# local imports
-from chaind_eth.cli.tx import TxProcessor
-logg = logging.getLogger(__name__)
-class Retrier:
- def __init__(self, sender, signer, source, chain_spec, gas_oracle, gas_factor=DEFAULT_GAS_FACTOR):
- self.sender = sender
- self.signer = signer
- self.source = source
- self.raw_content = []
- self.content = []
- self.cursor = 0
- self.gas_oracle = gas_oracle
- self.gas_factor = gas_factor
- self.chain_spec = chain_spec
- self.chain_id = chain_spec.chain_id()
- self.processor = [TxProcessor()]
- def load(self, process=True):
- for processor in self.processor:
- self.raw_content = processor.load(self.source)
- if self.raw_content != None:
- if process:
- #try:
- self.process()
- #except Exception as e:
- # raise TxSourceError('invalid source contents: {}'.format(str(e)))
- return self.content
- raise TxSourceError('unparseable source')
- def process(self):
- gas_data = self.gas_oracle.get_gas()
- gas_price = gas_data[0]
- for tx in self.raw_content:
- tx_bytes = bytes.fromhex(strip_0x(tx))
- tx = unpack(tx_bytes, self.chain_spec)
- tx_gas_price_old = int(tx['gasPrice'])
- if tx_gas_price_old < gas_price:
- tx['gasPrice'] = gas_price
- else:
- tx['gasPrice'] = int(tx_gas_price_old * self.gas_factor)
- if tx_gas_price_old == tx['gasPrice']:
- tx['gasPrice'] += 1
- tx_obj = EIP155Transaction(tx, tx['nonce'], self.chain_id)
- new_tx_bytes = self.signer.sign_transaction_to_wire(tx_obj)
- logg.debug('add tx {} with gas price changed from {} to {}: {}'.format(tx['hash'], tx_gas_price_old, tx['gasPrice'], new_tx_bytes.hex()))
- self.content.append(new_tx_bytes)
- def __iter__(self):
- self.cursor = 0
- return self
- def __next__(self):
- if self.cursor == len(self.content):
- raise StopIteration()
- tx = self.content[self.cursor]
- self.cursor += 1
- return tx
diff --git a/chaind_eth/cli/ b/chaind_eth/cli/
@@ -1,23 +0,0 @@
-# standard imports
-import logging
-logg = logging.getLogger(__name__)
-class TxProcessor:
- def load(self, s):
- contents = []
- f = None
- try:
- f = open(s, 'r')
- except FileNotFoundError:
- return None
- contents = f.readlines()
- f.close()
- for i in range(len(contents)):
- contents[i] = contents[i].rstrip()
- return contents
- def __str__(self):
- return 'tx processor'
diff --git a/chaind_eth/data/config/config.ini b/chaind_eth/data/config/config.ini
@@ -1,16 +0,0 @@
-socket_path =
-runtime_dir =
-id =
-data_dir =
-dispatch_delay = 4.0
-engine =
-name = chaind
-driver =
-user =
-password =
-host =
-port =
-debug = 0
diff --git a/chaind_eth/data/config/syncer/config.ini b/chaind_eth/data/config/syncer/config.ini
@@ -1,4 +0,0 @@
-history_start = 0
-skip_history = 0
-loop_interval = 1
diff --git a/chaind_eth/ b/chaind_eth/
@@ -1,72 +0,0 @@
-# standard imports
-import logging
-# external imports
-from chainlib.eth.address import to_checksum_address
-from chainlib.eth.tx import unpack
-from chainlib.error import JSONRPCException
-from chainqueue.enum import StatusBits
-from chainqueue.sql.query import count_tx
-from hexathon import strip_0x
-from chainqueue.encode import TxNormalizer
-#logg = logging.getLogger(__name__)
-logg = logging.getLogger()
-class Dispatcher:
- status_inflight_mask = StatusBits.IN_NETWORK | StatusBits.FINAL
- status_inflight_mask_match = StatusBits.IN_NETWORK
- def __init__(self, chain_spec, adapter, limit=100):
- self.address_counts = {}
- self.chain_spec = chain_spec
- self.adapter = adapter
- self.limit = limit
- self.tx_normalizer = TxNormalizer()
- def __init_count(self, address, session):
- c = self.address_counts.get(address)
- if c == None:
- c = self.limit - count_tx(self.chain_spec, address, self.status_inflight_mask, self.status_inflight_mask_match, session=session)
- if c < 0:
- c = 0
- self.address_counts[address] = c
- return c
- def get_count(self, address, session):
- address = self.tx_normalizer.wallet_address(address)
- return self.__init_count(address, session)
- def inc_count(self, address, session):
- address = self.tx_normalizer.wallet_address(address)
- self.__init_count(address, session)
- self.address_counts[address] -= 1
- def process(self, rpc, session):
- c = 0
- txs = self.adapter.upcoming(self.chain_spec, session=session)
- for k in txs.keys():
- signed_tx_bytes = bytes.fromhex(strip_0x(txs[k]))
- tx_obj = unpack(signed_tx_bytes, self.chain_spec)
- sender = to_checksum_address(tx_obj['from'])
- address_count = self.get_count(sender, session)
- if address_count == 0:
- logg.debug('too many inflight txs for {}, skipping {}'.format(sender, k))
- continue
- logg.debug('processing tx {} {}'.format(k, txs[k]))
- r = 0
- try:
- r = self.adapter.dispatch(self.chain_spec, rpc, k, txs[k], session)
- except JSONRPCException as e:
- logg.error('dispatch failed for {}: {}'.format(k, e))
- continue
- if r == 0:
- self.inc_count(sender, session)
- c += 1
- return c
diff --git a/chaind_eth/ b/chaind_eth/
@@ -1,27 +0,0 @@
-# standard imports
-import logging
-# external imports
-from chainlib.status import Status
-from chainqueue.sql.query import get_tx
-from chainqueue.error import NotLocalTxError
-from chainqueue.sql.state import set_final
-logg = logging.getLogger(__name__)
-class StateFilter:
- def __init__(self, chain_spec):
- self.chain_spec = chain_spec
- def filter(self, conn, block, tx, session=None):
- otx = None
- try:
- otx = get_tx(self.chain_spec, tx.hash, session=session)
- except NotLocalTxError:
- return False
-'finalizing local tx {} with status {}'.format(tx.hash, tx.status))
- status = tx.status != Status.SUCCESS
- set_final(self.chain_spec, tx.hash, block=block.number, tx_index=tx.index, fail=status, session=session)
diff --git a/chaind_eth/runnable/ b/chaind_eth/runnable/
@@ -1,108 +0,0 @@
-# standard imports
-import os
-import logging
-import sys
-import datetime
-import enum
-import re
-import stat
-# external imports
-import chainlib.eth.cli
-from chaind import Environment
-from chainlib.eth.gas import price
-from chainlib.chain import ChainSpec
-from hexathon import strip_0x
-from eth_token_index.index import TokenUniqueSymbolIndex
-# local imports
-from chaind_eth.cli.retry import Retrier
-from chaind.error import TxSourceError
-from chaind_eth.cli.output import (
- Outputter,
- OpMode,
- )
-logg = logging.getLogger()
-script_dir = os.path.dirname(os.path.realpath(__file__))
-config_dir = os.path.join(script_dir, '..', 'data', 'config')
-arg_flags = chainlib.eth.cli.argflag_std_write
-argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
-argparser.add_argument('--socket', dest='socket', type=str, help='Socket to send transactions to')
-argparser.add_positional('source', required=False, type=str, help='Transaction source file')
-args = argparser.parse_args()
-extra_args = {
- 'socket': None,
- 'source': None,
- }
-env = Environment(domain='eth', env=os.environ)
-config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
-wallet = chainlib.eth.cli.Wallet()
-rpc = chainlib.eth.cli.Rpc(wallet=wallet)
-conn = rpc.connect_by_config(config)
-chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
-mode = OpMode.STDOUT
-re_unix = r'^ipc://(/.+)'
-m = re.match(re_unix, config.get('_SOCKET', ''))
-if m != None:
- config.add(, '_SOCKET', exists_ok=True)
- r = 0
- try:
- stat_info = os.stat(config.get('_SOCKET'))
- if not stat.S_ISSOCK(stat_info.st_mode):
- r = 1
- except FileNotFoundError:
- r = 1
- if r > 0:
- sys.stderr.write('{} is not a socket\n'.format(config.get('_SOCKET')))
- sys.exit(1)
- mode = OpMode.UNIX
-'using mode {}'.format(mode.value))
-if config.get('_SOURCE') == None:
- sys.stderr.write('source data missing\n')
- sys.exit(1)
-def main():
- signer = rpc.get_signer()
- # TODO: make resolvers pluggable
- processor = Retrier(wallet.get_signer_address(), wallet.get_signer(), config.get('_SOURCE'), chain_spec, rpc.get_gas_oracle())
- sends = None
- try:
- sends = processor.load()
- except TxSourceError as e:
- sys.stderr.write('processing error: {}. processors: {}\n'.format(str(e), str(processor)))
- sys.exit(1)
- tx_iter = iter(processor)
- out = Outputter(mode)
- while True:
- tx = None
- try:
- tx_bytes = next(tx_iter)
- except StopIteration:
- break
- tx_hex = tx_bytes.hex()
- print(, socket=config.get('_SOCKET')))
-if __name__ == '__main__':
- main()
diff --git a/chaind_eth/runnable/ b/chaind_eth/runnable/
@@ -1,152 +0,0 @@
-# standard imports
-import os
-import logging
-import sys
-import datetime
-import enum
-import re
-import stat
-import socket
-# external imports
-import chainlib.eth.cli
-from chaind import Environment
-from chainlib.eth.gas import price
-from chainlib.chain import ChainSpec
-from hexathon import strip_0x
-# local imports
-from chaind_eth.cli.process import Processor
-from chaind_eth.cli.csv import CSVProcessor
-from chaind.error import TxSourceError
-from chaind_eth.cli.resolver import (
- DefaultResolver,
- LookNoop,
- TokenIndexLookup,
- )
-logg = logging.getLogger()
-script_dir = os.path.dirname(os.path.realpath(__file__))
-config_dir = os.path.join(script_dir, '..', 'data', 'config')
-arg_flags = chainlib.eth.cli.argflag_std_write
-argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
-argparser.add_argument('--socket', dest='socket', type=str, help='Socket to send transactions to')
-argparser.add_argument('--token-index', dest='token_index', type=str, help='Token resolver index')
-argparser.add_positional('source', required=False, type=str, help='Transaction source file')
-args = argparser.parse_args()
-extra_args = {
- 'socket': None,
- 'source': None,
- 'token_index': None,
- }
-env = Environment(domain='eth', env=os.environ)
-config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
-wallet = chainlib.eth.cli.Wallet()
-rpc = chainlib.eth.cli.Rpc(wallet=wallet)
-conn = rpc.connect_by_config(config)
-chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
-class OpMode(enum.Enum):
- STDOUT = 'standard_output'
- UNIX = 'unix_socket'
-mode = OpMode.STDOUT
-re_unix = r'^ipc://(/.+)'
-m = re.match(re_unix, config.get('_SOCKET', ''))
-if m != None:
- config.add(, '_SOCKET', exists_ok=True)
- r = 0
- try:
- stat_info = os.stat(config.get('_SOCKET'))
- if not stat.S_ISSOCK(stat_info.st_mode):
- r = 1
- except FileNotFoundError:
- r = 1
- if r > 0:
- sys.stderr.write('{} is not a socket\n'.format(config.get('_SOCKET')))
- sys.exit(1)
- mode = OpMode.UNIX
-'using mode {}'.format(mode.value))
-if config.get('_SOURCE') == None:
- sys.stderr.write('source data missing\n')
- sys.exit(1)
-class Outputter:
- def __init__(self, mode):
- self.out = getattr(self, 'do_' + mode.value)
- def do(self, hx):
- return self.out(hx)
- def do_standard_output(self, hx):
- #sys.stdout.write(hx + '\n')
- return hx
- def do_unix_socket(self, hx):
- s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- s.connect(config.get('_SOCKET'))
- s.send(hx.encode('utf-8'))
- r = s.recv(64+4)
- logg.debug('r {}'.format(r))
- s.close()
- return r[4:].decode('utf-8')
-def main():
- signer = rpc.get_signer()
- # TODO: make resolvers pluggable
- token_resolver = DefaultResolver(chain_spec, conn, sender_address=rpc.get_sender_address())
- noop_lookup = LookNoop(check=not config.true('_UNSAFE'))
- token_resolver.add_lookup(noop_lookup, 'noop')
- if config.get('_TOKEN_INDEX') != None:
- token_index_lookup = TokenIndexLookup(chain_spec, signer, rpc.get_gas_oracle(), rpc.get_nonce_oracle(), config.get('_TOKEN_INDEX'))
- token_resolver.add_lookup(token_index_lookup, reverse=config.get('_TOKEN_INDEX'))
- processor = Processor(wallet.get_signer_address(), wallet.get_signer(), config.get('_SOURCE'), chain_spec, rpc.get_gas_oracle(), rpc.get_nonce_oracle(), resolver=token_resolver)
- processor.add_processor(CSVProcessor())
- sends = None
- try:
- sends = processor.load()
- except TxSourceError as e:
- sys.stderr.write('processing error: {}. processors: {}\n'.format(str(e), str(processor)))
- sys.exit(1)
- tx_iter = iter(processor)
- out = Outputter(mode)
- while True:
- tx = None
- try:
- tx_bytes = next(tx_iter)
- except StopIteration:
- break
- tx_hex = tx_bytes.hex()
- print(
-if __name__ == '__main__':
- main()
diff --git a/chaind_eth/runnable/ b/chaind_eth/runnable/
@@ -1,217 +0,0 @@
-# standard imports
-import sys
-import time
-import socket
-import signal
-import os
-import logging
-import stat
-import argparse
-# external imports
-import chainlib.eth.cli
-from chaind import Environment
-from hexathon import strip_0x
-from chainlib.chain import ChainSpec
-from chainlib.eth.connection import EthHTTPConnection
-from chainqueue.sql.backend import SQLBackend
-from chainlib.error import JSONRPCException
-from chainqueue.db import dsn_from_config
-from chaind.sql.session import SessionIndex
-# local imports
-from chaind_eth.dispatch import Dispatcher
-from chainqueue.adapters.eth import EthAdapter
-logg = logging.getLogger()
-script_dir = os.path.dirname(os.path.realpath(__file__))
-config_dir = os.path.join(script_dir, '..', 'data', 'config')
-env = Environment(domain='eth', env=os.environ)
-arg_flags = chainlib.eth.cli.argflag_std_read
-argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
-argparser.add_argument('--data-dir', type=str, help='data directory')
-argparser.add_argument('--runtime-dir', type=str, help='runtime directory')
-argparser.add_argument('--session-id', dest='session_id', type=str, help='session identifier')
-argparser.add_argument('--dispatch-delay', dest='dispatch_delay', type=float, help='socket timeout before processing queue')
-args = argparser.parse_args()
-extra_args = {
- 'runtime_dir': 'SESSION_RUNTIME_DIR',
- 'data_dir': 'SESSION_DATA_DIR',
- 'session_id': 'SESSION_ID',
- 'dispatch_delay': 'SESSION_DISPATCH_DELAY',
- }
-#config = chainlib.eth.cli.Config.from_args(args, arg_flags, default_config_dir=config_dir, extend_base_config_dir=config_dir)
-config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
-logg.debug('session id {} {}'.format(type(config.get('SESSION_ID')), config.get('SESSION_ID')))
-if config.get('SESSION_ID') == None:
- config.add(env.session, 'SESSION_ID', exists_ok=True)
-if config.get('SESSION_RUNTIME_DIR') == None:
- config.add(env.runtime_dir, 'SESSION_RUNTIME_DIR', exists_ok=True)
-if config.get('SESSION_DATA_DIR') == None:
- config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True)
-if not config.get('SESSION_SOCKET_PATH'):
- socket_path = os.path.join(config.get('SESSION_RUNTIME_DIR'), config.get('SESSION_ID'), 'chaind.sock')
- config.add(socket_path, 'SESSION_SOCKET_PATH', True)
-if config.get('DATABASE_ENGINE') == 'sqlite':
- #config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True)
- config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True)
-config.censor('PASSWORD', 'DATABASE')
-logg.debug('config loaded:\n{}'.format(config))
-# verify setup
- os.stat(config.get('DATABASE_NAME'))
-except FileNotFoundError:
- sys.stderr.write('database file {} not found. please run database migration script first\n'.format(config.get('DATABASE_NAME')))
- sys.exit(1)
-class SessionController:
- def __init__(self, config):
- self.dead = False
- os.makedirs(os.path.dirname(config.get('SESSION_SOCKET_PATH')), exist_ok=True)
- try:
- os.unlink(config.get('SESSION_SOCKET_PATH'))
- except FileNotFoundError:
- pass
- self.srv = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
- self.srv.bind(config.get('SESSION_SOCKET_PATH'))
- self.srv.listen(2)
- self.srv.settimeout(float(config.get('SESSION_DISPATCH_DELAY')))
- def shutdown(self, signo, frame):
- if self.dead:
- return
- self.dead = True
- if signo != None:
-'closing on {}'.format(signo))
- else:
-'explicit shutdown')
- sockname = self.srv.getsockname()
- self.srv.close()
- try:
- os.unlink(sockname)
- except FileNotFoundError:
- logg.warning('socket file {} already gone'.format(sockname))
- def get_connection(self):
- return self.srv.accept()
-ctrl = SessionController(config)
-signal.signal(signal.SIGINT, ctrl.shutdown)
-signal.signal(signal.SIGTERM, ctrl.shutdown)
-chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
-rpc = chainlib.eth.cli.Rpc()
-conn = rpc.connect_by_config(config)
-logg.debug('error {}'.format(rpc.error_parser))
-dsn = dsn_from_config(config)
-backend = SQLBackend(dsn, error_parser=rpc.error_parser, debug=config.true('DATABASE_DEBUG'))
-session_index_backend = SessionIndex(config.get('SESSION_ID'))
-adapter = EthAdapter(backend, session_index_backend=session_index_backend)
-def process_outgoing(chain_spec, adapter, rpc, limit=100):
- dispatcher = Dispatcher(chain_spec, adapter, limit=limit)
- session = adapter.create_session()
- r = dispatcher.process(rpc, session)
- session.close()
- return r
-def main():
- havesends = 0
- while True:
- srvs = None
- try:
- logg.debug('getting connection')
- (srvs, srvs_addr) = ctrl.get_connection()
- except OSError as e:
- try:
- fi = os.stat(config.get('SESSION_SOCKET_PATH'))
- except FileNotFoundError:
- logg.error('socket is gone')
- break
- if not stat.S_ISSOCK(fi.st_mode):
- logg.error('entity on socket path is not a socket')
- break
- if srvs == None:
- logg.debug('timeout (remote socket is none)')
- r = process_outgoing(chain_spec, adapter, conn)
- if r > 0:
- ctrl.srv.settimeout(0.1)
- else:
- ctrl.srv.settimeout(4.0)
- continue
- ctrl.srv.settimeout(0.1)
- srvs.settimeout(0.1)
- data_in = None
- try:
- data_in = srvs.recv(1048576)
- except BlockingIOError as e:
- logg.debug('block io error: {}'.format(e))
- continue
- data = None
- try:
- data_in_str = data_in.decode('utf-8')
- data_hex = strip_0x(data_in_str.rstrip())
- data = bytes.fromhex(data_hex)
- except ValueError:
- logg.error('invalid input "{}"'.format(data_in_str))
- continue
- logg.debug('recv {} bytes'.format(len(data)))
- session = backend.create_session()
- tx_hash = None
- signed_tx = None
- try:
- tx_hash = adapter.add(data_hex, chain_spec, session=session)
- except ValueError as e:
- try:
- signed_tx = adapter.get(data_hex, chain_spec, session=session)
- except ValueError as e:
- logg.error('invalid input: {}'.format(e))
- if tx_hash != None:
- session.commit()
- try:
- r = int(0).to_bytes(4, byteorder='big')
- r += strip_0x(tx_hash).encode('utf-8')
- srvs.send(r)
- logg.debug('{} bytes sent'.format(r))
- except BrokenPipeError:
- logg.debug('they just hung up. how rude.')
- elif signed_tx != None:
- r = int(0).to_bytes(4, byteorder='big')
- r += strip_0x(signed_tx).encode('utf-8')
- try:
- r = srvs.send(r)
- except BrokenPipeError:
- logg.debug('they just hung up. how useless.')
- else:
- r = srvs.send(int(1).to_bytes(4, byteorder='big'))
- session.close()
- srvs.close()
- ctrl.shutdown(None, None)
-if __name__ == '__main__':
- main()
diff --git a/chaind_eth/runnable/ b/chaind_eth/runnable/
@@ -1,141 +0,0 @@
-# standard imports
-import sys
-import time
-import socket
-import signal
-import os
-import logging
-import stat
-import argparse
-import uuid
-# external imports
-import chainlib.eth.cli
-from chaind import Environment
-import confini
-from hexathon import strip_0x
-from chainlib.chain import ChainSpec
-from chainlib.eth.connection import EthHTTPConnection
-from chainlib.eth.block import block_latest
-from chainsyncer.driver.head import HeadSyncer
-from chainsyncer.driver.history import HistorySyncer
-from chainsyncer.db import dsn_from_config
-from chainsyncer.db.models.base import SessionBase
-from chainsyncer.backend.sql import SQLBackend
-from chainsyncer.error import SyncDone
-# local imports
-from chaind_eth.filter import StateFilter
-from chaind_eth.chain import EthChainInterface
-logg = logging.getLogger()
-script_dir = os.path.dirname(os.path.realpath(__file__))
-config_dir = os.path.join(script_dir, '..', 'data', 'config')
-env = Environment(domain='eth', env=os.environ)
-arg_flags = chainlib.eth.cli.argflag_std_read
-argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
-argparser.add_argument('--data-dir', type=str, help='data directory')
-argparser.add_argument('--runtime-dir', type=str, help='runtime directory')
-argparser.add_argument('--session-id', dest='session_id', type=str, help='session identifier')
-argparser.add_argument('--offset', default=0, type=int, help='block height to sync history from')
-args = argparser.parse_args()
-extra_args = {
- 'runtime_dir': 'SESSION_RUNTIME_DIR',
- 'data_dir': 'SESSION_DATA_DIR',
- 'session_id': 'SESSION_ID',
- }
-#config = chainlib.eth.cli.Config.from_args(args, arg_flags, default_config_dir=config_dir, extend_base_config_dir=config_dir)
-config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=[config_dir, os.path.join(config_dir, 'syncer')])
-logg.debug('session id {} {}'.format(type(config.get('SESSION_ID')), config.get('SESSION_ID')))
-if config.get('SESSION_ID') == None:
- config.add(env.session, 'SESSION_ID', exists_ok=True)
-if config.get('SESSION_RUNTIME_DIR') == None:
- config.add(env.runtime_dir, 'SESSION_RUNTIME_DIR', exists_ok=True)
-if config.get('SESSION_DATA_DIR') == None:
- config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True)
-if not config.get('SESSION_SOCKET_PATH'):
- socket_path = os.path.join(config.get('SESSION_RUNTIME_DIR'), config.get('SESSION_ID'), 'chaind.sock')
- config.add(socket_path, 'SESSION_SOCKET_PATH', True)
-if config.get('DATABASE_ENGINE') == 'sqlite':
- config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True)
-config.censor('PASSWORD', 'DATABASE')
-logg.debug('config loaded:\n{}'.format(config))
-chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
-dsn = dsn_from_config(config)
-logg.debug('dns {}'.format(dsn))
-SQLBackend.setup(dsn, debug=config.true('DATABASE_DEBUG'))
-rpc = EthHTTPConnection(url=config.get('RPC_PROVIDER'), chain_spec=chain_spec)
-def register_filter_tags(filters, session):
- for f in filters:
- tag = f.tag()
- try:
- add_tag(session, tag[0], domain=tag[1])
- session.commit()
-'added tag name "{}" domain "{}"'.format(tag[0], tag[1]))
- except sqlalchemy.exc.IntegrityError:
- session.rollback()
- logg.debug('already have tag name "{}" domain "{}"'.format(tag[0], tag[1]))
-def main():
- o = block_latest()
- r =
- block_offset = int(strip_0x(r), 16) + 1
- syncers = []
- syncer_backends = SQLBackend.resume(chain_spec, block_offset)
- if len(syncer_backends) == 0:
- initial_block_start = config.get('SYNCER_HISTORY_START', 0)
- if isinstance(initial_block_start, str):
- initial_block_start = int(initial_block_start)
- initial_block_offset = block_offset
- if config.true('SYNCER_SKIP_HISTORY'):
- initial_block_start = block_offset
- initial_block_offset += 1
- syncer_backends.append(SQLBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start))
-'found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset))
- else:
- for syncer_backend in syncer_backends:
-'resuming sync session {}'.format(syncer_backend))
- chain_interface = EthChainInterface()
- for syncer_backend in syncer_backends:
- syncers.append(HistorySyncer(syncer_backend, chain_interface))
- syncer_backend =, block_offset+1)
- syncers.append(HeadSyncer(syncer_backend, chain_interface))
- state_filter = StateFilter(chain_spec)
- filters = [
- state_filter,
- ]
- i = 0
- for syncer in syncers:
- logg.debug('running syncer index {}'.format(i))
- for f in filters:
- syncer.add_filter(f)
- r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
- sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
- i += 1
- sys.exit(0)
-if __name__ == '__main__':
- main()
diff --git a/erc20_requirements.txt b/erc20_requirements.txt
@@ -0,0 +1 @@
diff --git a/setup.cfg b/setup.cfg
@@ -35,7 +35,7 @@ packages =
console_scripts =
- chaind-eth-server = chaind_eth.runnable.server:main
-# chaind-eth-syncer = chaind_eth.runnable.syncer:main
+ chaind-eth-tasker = chaind_eth.runnable.tasker:main
+ chaind-eth-syncer = chaind_eth.runnable.syncer:main
chaind-eth-send = chaind_eth.runnable.send:main
- chaind-eth-resend = chaind_eth.runnable.resend:main
+ #chaind-eth-resend = chaind_eth.runnable.resend:main
diff --git a/ b/
@@ -32,5 +32,6 @@ setup(
'postgres': postgres_requirements,
'sqlite': sqlite_requirements,
+ 'erc20': erc20_requirements,