eth-monitor

Monitor and cache ethereum transactions with match filters
git clone git://git.defalsify.org/eth-monitor.git
Log | Files | Refs | README | LICENSE

commit 06a4852fe56206a63d36bd687afcdc4362d70e27
parent cf59efa690f5417ca443e7c8a6ab886d3a8958b6
Author: lash <dev@holbrook.no>
Date:   Tue,  1 Mar 2022 12:10:35 +0000

Factor out cache rpc and store to eth-cache

Diffstat:
Meth_monitor/filters/gas/__init__.py | 3++-
Meth_monitor/index.py | 2+-
Deth_monitor/rpc.py | 81-------------------------------------------------------------------------------
Meth_monitor/runnable/import.py | 9++++++---
Meth_monitor/runnable/list.py | 4++--
Meth_monitor/runnable/sync.py | 4++--
Deth_monitor/store/file.py | 149-------------------------------------------------------------------------------
Deth_monitor/store/null.py | 39---------------------------------------
Mrequirements.txt | 1+
9 files changed, 14 insertions(+), 278 deletions(-)

diff --git a/eth_monitor/filters/gas/__init__.py b/eth_monitor/filters/gas/__init__.py @@ -17,6 +17,7 @@ def apply(c, result, chain_spec, conn, block, tx, db_session=None): value = value.rstrip('0') if value[len(value)-1] == '.': value += '0' - s = '{} {}\t{} -> {} = {}'.format(timestamp, tx.hash, tx.outputs[0], tx.inputs[0], value) + + s = '{} {} {}\t{} -> {} = {}'.format(timestamp, tx.hash, tx.status.name, tx.outputs[0], tx.inputs[0], value) result.set(s) return False diff --git a/eth_monitor/index.py b/eth_monitor/index.py @@ -43,7 +43,7 @@ class AddressIndex: k = '{}.{}'.format(block_number, tx_index) - txs[k] = tx + txs[float(k)] = tx ks = list(txs.keys()) ks.sort() diff --git a/eth_monitor/rpc.py b/eth_monitor/rpc.py @@ -1,81 +0,0 @@ -# standard imports -import json -from jsonrpc_std.parse import jsonrpc_from_dict -import logging - -# external imports -from hexathon import strip_0x - -logg = logging.getLogger(__name__) - -class CacheRPC: - - def __init__(self, rpc, store): - self.rpc = rpc - self.store = store - - - def do(self, o): - req = jsonrpc_from_dict(o) - r = None - if req['method'] == 'eth_getBlockByNumber': - block_number = req['params'][0] - v = int(strip_0x(block_number), 16) - try: - j = self.store.get_block_number(v) - r = json.loads(j) - logg.debug('using cached block {} -> {}'.format(v, r['hash'])) - except FileNotFoundError: - pass - elif req['method'] == 'eth_getBlockByHash': - block_hash = req['params'][0] - v = strip_0x(block_hash) - try: - j = self.store.get_block(v) - r = json.loads(j) - logg.debug('using cached block {}'.format(r['hash'])) - except FileNotFoundError as e: - logg.debug('not found {}'.format(e)) - pass - elif req['method'] == 'eth_getTransactionReceipt': - tx_hash = req['params'][0] - j = None - try: - tx_hash = strip_0x(tx_hash) - j = self.store.get_rcpt(tx_hash) - r = json.loads(j) - logg.debug('using cached rcpt {}'.format(tx_hash)) - except FileNotFoundError as e: - logg.debug('no file {}'.format(e)) - pass - -# elif req['method'] == 'eth_getTransactionByHash': -# raise ValueError(o) -# elif req['method'] == 'eth_getTransactionByBlockHashAndIndex': -# logg.debug('trying tx index {}'.format(o)) -# v = req['params'][0] -# j = None -# try: -# j = self.store.get_block(v) -# except FileNotFoundError: -# pass -# -# if j != None: -# o = json.loads(j) -# idx = int(req['params'][1], 16) -# v = r['transactions'][idx] -# j = None -# try: -# j = self.store.get_tx(v) -# except FileNotFoundError: -# pass -# -# if j != None: -# r = json.loads(j) -# logg.debug('using cached tx {} -> {}'.format(req['params'], r['hash'])) - - if r == None: - logg.debug('passthru {}'.format(o)) - r = self.rpc.do(o) - - return r diff --git a/eth_monitor/runnable/import.py b/eth_monitor/runnable/import.py @@ -9,11 +9,11 @@ import time from chainlib.encode import TxHexNormalizer from chainlib.eth.connection import EthHTTPConnection from chainlib.chain import ChainSpec +from eth_cache.store.file import FileStore # local imports from eth_monitor.filters.cache import Filter as CacheFilter from eth_monitor.filters import RuledFilter -from eth_monitor.store.file import FileStore from eth_monitor.rules import ( AddressRules, RuleSimple, @@ -33,6 +33,8 @@ argparser.add_argument('--store-block-data', dest='store_block_data', action='st argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string') argparser.add_argument('-f', '--address-file', dest='address_file', default=[], type=str, action='append', help='Add addresses from file') argparser.add_argument('-a', '--address', default=[], type=str, action='append', help='Add address') +argparser.add_argument('--socks-host', dest='socks_host', type=str, help='Conect through socks host') +argparser.add_argument('--socks-port', dest='socks_port', type=int, help='Conect through socks port') argparser.add_argument('--delay', type=float, default=0.2, help='Seconds to wait between each retrieval from importer') argparser.add_argument('-v', action='store_true', help='Be verbose') argparser.add_argument('-vv', action='store_true', help='Be more verbose') @@ -54,7 +56,7 @@ rpc = EthHTTPConnection(args.p) chain_spec = ChainSpec.from_chain_str(args.i) -def conn_socks_tor(host='127.0.0.1', port=9050): +def conn_socks(host, port): import socks import socket @@ -114,7 +116,8 @@ def setup_filter(chain_spec, cache_dir, include_tx_data, include_block_data, add def main(): - conn_socks_tor() + if args.socks_host != None: + conn_socks(args.socks_host, args.socks_port) addresses = collect_addresses(args.address, args.address_file) from eth_monitor.importers.etherscan import Importer as EtherscanImporter diff --git a/eth_monitor/runnable/list.py b/eth_monitor/runnable/list.py @@ -17,11 +17,11 @@ from chainlib.eth.tx import ( receipt, Tx, ) +from eth_cache.store.file import FileStore +from eth_cache.rpc import CacheRPC # local imports -from eth_monitor.store.file import FileStore from eth_monitor.index import AddressIndex -from eth_monitor.rpc import CacheRPC from eth_monitor.filters.out import OutFilter from eth_monitor.rules import AddressRules diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py @@ -19,6 +19,8 @@ from chainsyncer.driver.head import HeadSyncer from chainsyncer.driver.history import HistorySyncer from chainsyncer.backend.file import FileBackend from chainsyncer.filter import NoopFilter +from eth_cache.rpc import CacheRPC +from eth_cache.store.file import FileStore # local imports from eth_monitor.chain import EthChainInterface @@ -29,8 +31,6 @@ from eth_monitor.rules import ( ) from eth_monitor.filters import RuledFilter from eth_monitor.filters.out import OutFilter -from eth_monitor.store.file import FileStore -from eth_monitor.rpc import CacheRPC logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() diff --git a/eth_monitor/store/file.py b/eth_monitor/store/file.py @@ -1,149 +0,0 @@ -# standard imports -import os -import json -import logging - -# external imports -from hexathon import strip_0x -from chainlib.eth.tx import ( - Tx, - pack, - ) -from chainsyncer.backend.file import chain_dir_for -from leveldir.numeric import NumDir -from leveldir.hex import HexDir - -logg = logging.getLogger(__name__) - -base_dir = '/var/lib' - - -class FileStore: - - def put_tx(self, tx, include_data=False): - raw = pack(tx.src(), self.chain_spec) - tx_hash_dirnormal = strip_0x(tx.hash).upper() - tx_hash_bytes = bytes.fromhex(tx_hash_dirnormal) - self.tx_raw_dir.add(tx_hash_bytes, raw) - if self.address_rules != None: - for a in tx.outputs + tx.inputs: - if self.address_rules.apply_rules_addresses(a, a, tx.hash): - a_hex = strip_0x(a).upper() - a = bytes.fromhex(a_hex) - self.address_dir.add_dir(tx_hash_dirnormal, a, b'') - dirpath = self.address_dir.to_filepath(a_hex) - fp = os.path.join(dirpath, '.start') - num = tx.block.number - num_compare = 0 - try: - f = open(fp, 'rb') - r = f.read(8) - f.close() - num_compare = int.from_bytes(r, 'big') - except FileNotFoundError: - pass - - if num_compare == 0 or num < num_compare: - logg.debug('recoding new start block {} for {}'.format(num, a)) - num_bytes = num.to_bytes(8, 'big') - f = open(fp, 'wb') - f.write(num_bytes) - f.close() - - if include_data: - src = json.dumps(tx.src()).encode('utf-8') - self.tx_dir.add(bytes.fromhex(strip_0x(tx.hash)), src) - - rcpt_src = tx.rcpt_src() - logg.debug('rcpt {}'.format(rcpt_src)) - if rcpt_src != None: - rcpt_src = json.dumps(rcpt_src).encode('utf-8') - self.rcpt_dir.add(bytes.fromhex(strip_0x(tx.hash)), rcpt_src) - - - - def put_block(self, block, include_data=False): - hash_bytes = bytes.fromhex(strip_0x(block.hash)) - self.block_num_dir.add(block.number, hash_bytes) - num_bytes = block.number.to_bytes(8, 'big') - self.block_hash_dir.add(hash_bytes, num_bytes) - if include_data: - src = json.dumps(block.src()).encode('utf-8') - self.block_src_dir.add(hash_bytes, src) - - - def get_block_number(self, block_number): - fp = self.block_num_dir.to_filepath(block_number) - f = open(fp, 'rb') - r = f.read() - f.close() - return self.get_block(r.hex()) - - - def get_block(self, block_hash): - fp = self.block_src_dir.to_filepath(block_hash) - f = open(fp, 'rb') - r = f.read() - f.close() - return r - - - def get_tx(self, tx_hash): - fp = self.tx_dir.to_filepath(tx_hash) - f = open(fp, 'rb') - r = f.read() - f.close() - return r - - - def get_rcpt(self, tx_hash): - fp = self.rcpt_dir.to_filepath(tx_hash) - f = open(fp, 'rb') - r = f.read() - f.close() - return r - - - def get_address_tx(self, address): - fp = self.address_dir.to_filepath(address) - tx_hashes = [] - for tx_hash in os.listdir(fp): - if tx_hash[0] == '.': - continue - tx_hashes.append(tx_hash) - return tx_hashes - - - def __init__(self, chain_spec, cache_root=base_dir, address_rules=None): - self.cache_root = os.path.join( - cache_root, - 'eth_monitor', - chain_spec.engine(), - chain_spec.fork(), - str(chain_spec.chain_id()), - ) - self.cache_root = os.path.realpath(self.cache_root) - self.chain_dir = chain_dir_for(self.cache_root) - self.cache_dir = os.path.join(self.chain_dir, 'cache') - self.block_src_path = os.path.join(self.cache_dir, 'block', 'src') - self.block_src_dir = HexDir(self.block_src_path, 32, levels=2) - self.block_num_path = os.path.join(self.cache_dir, 'block', 'num') - self.block_num_dir = NumDir(self.block_num_path, [100000, 1000]) - self.block_hash_path = os.path.join(self.cache_dir, 'block', 'hash') - self.block_hash_dir = HexDir(self.block_hash_path, 32, levels=2) - self.tx_path = os.path.join(self.cache_dir, 'tx', 'src') - self.tx_raw_path = os.path.join(self.cache_dir, 'tx', 'raw') - self.tx_dir = HexDir(self.tx_path, 32, levels=2) - self.tx_raw_dir = HexDir(self.tx_raw_path, 32, levels=2) - self.rcpt_path = os.path.join(self.cache_dir, 'rcpt', 'src') - self.rcpt_raw_path = os.path.join(self.cache_dir, 'rcpt', 'raw') - self.rcpt_dir = HexDir(self.rcpt_path, 32, levels=2) - self.rcpt_raw_dir = HexDir(self.rcpt_raw_path, 32, levels=2) - self.address_path = os.path.join(self.cache_dir, 'address') - self.address_dir = HexDir(self.address_path, 20, levels=2) - self.chain_spec = chain_spec - self.address_rules = address_rules - - - def __str__(self): - return 'FileStore: root {}'.format(self.cache_root) diff --git a/eth_monitor/store/null.py b/eth_monitor/store/null.py @@ -1,39 +0,0 @@ -# standard imports -import os -import logging -import json - -logg = logging.getLogger(__name__) - - -class NullStore: - - def put_tx(self, tx, include_data=False): - pass - - - def put_block(self, block, include_data=False): - pass - - - def get_block_number(self, v): - raise FileNotFoundError(v) - - - def get_block(self, v): - raise FileNotFoundError(v) - - - def get_tx(self, v): - raise FileNotFoundError(v) - - - def get_rcpt(self, v): - raise FileNotFoundError(v) - - - def __init__(self): - self.chain_dir = '/dev/null' - - def __str__(self): - return "Nullstore" diff --git a/requirements.txt b/requirements.txt @@ -3,3 +3,4 @@ chainlib~=0.0.23 chainsyncer~=0.1.0 eth-erc20~=0.1.11 leveldir~=0.3.0 +eth-cache~=0.1.0