commit aabc632d8d8b741a9e28b9226feb66b51ec3bb72
parent 49fe2ac0fd59182c966489907b7ad85edc1b54ac
Author: lash <dev@holbrook.no>
Date: Sun, 27 Feb 2022 13:52:05 +0000
Enable read block and rcpt from cache
Diffstat:
9 files changed, 108 insertions(+), 14 deletions(-)
diff --git a/eth_monitor/filters/cache.py b/eth_monitor/filters/cache.py
@@ -11,3 +11,12 @@ class Filter(RuledFilter):
def ruled_filter(self, conn, block, tx, db_session=None):
self.store.put_tx(tx, include_data=self.include_tx_data)
+
+
+ def filter(self, conn, block, tx, db_session=None):
+ r = super(Filter, self).filter(conn, block, tx, db_session=db_session)
+ if r == False:
+ return True
+
+ self.ruled_filter(conn, block, tx, db_session=db_session)
+ return True
diff --git a/eth_monitor/filters/out.py b/eth_monitor/filters/out.py
@@ -14,6 +14,24 @@ def apply_interface(c, s, chain_str, conn, block, tx, db_session=None):
pass
+class OutResult:
+
+ def __init__(self):
+ self.content = ''
+
+
+ def set(self, v):
+ self.content = v
+
+
+ def get(self):
+ return self.content
+
+
+ def __str__(self):
+ return self.content
+
+
class OutFilter(RuledFilter):
def __init__(self, chain_spec, writer=sys.stdout, renderers=[], rules_filter=None):
@@ -22,6 +40,7 @@ class OutFilter(RuledFilter):
self.renderers = renderers
self.c = 0
self.chain_spec = chain_spec
+ self.result = OutResult()
def filter(self, conn, block, tx, db_session=None):
@@ -29,14 +48,14 @@ class OutFilter(RuledFilter):
if r == False:
return True
- s = None
-
for renderer in self.renderers:
- s = renderer.apply(self.c, s, self.chain_spec, conn, block, tx)
- if s != None:
+ r = renderer.apply(self.c, self.result, self.chain_spec, conn, block, tx)
+ if not r:
break
- if s == None:
+ s = str(self.result)
+
+ if s == '':
data = tx.payload
if len(data) > 8:
data = data[:8] + '...'
diff --git a/eth_monitor/importers/etherscan.py b/eth_monitor/importers/etherscan.py
@@ -14,7 +14,7 @@ from chainlib.eth.tx import (
)
-class EtherscanImporter:
+class Importer:
def __init__(self, rpc, api_key, filters=[], block_callback=None):
self.api_key = api_key
diff --git a/eth_monitor/runnable/import.py b/eth_monitor/runnable/import.py
@@ -107,7 +107,7 @@ def main():
conn_socks_tor()
addresses = collect_addresses(args.address, args.address_file)
- from eth_monitor.importers.etherscan import EtherscanImporter
+ from eth_monitor.importers.etherscan import Importer as EtherscanImporter
address_rules = setup_address_rules(args.address)
diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py
@@ -30,6 +30,7 @@ from eth_monitor.rules import (
from eth_monitor.filters import RuledFilter
from eth_monitor.filters.out import OutFilter
from eth_monitor.store.file import FileStore
+from eth_monitor.rpc import CacheRPC
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -69,6 +70,7 @@ argparser.add_argument('--address-file', type=str, dest='excludes_file', help='L
argparser.add_argument('--renderer', type=str, action='append', default=[], help='Python modules to dynamically load for rendering of transaction output')
argparser.add_argument('--filter', type=str, action='append', help='Add python module filter path')
argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data')
+argparser.add_argument('--fresh', action='store_true', help='Do not read block and tx data from cache, even if available')
argparser.add_argument('--single', action='store_true', help='Execute a single sync, regardless of previous states')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', action='store_true', help='Be more verbose')
@@ -85,8 +87,6 @@ else:
logging.getLogger('chainsyncer.backend.file').setLevel(logging.WARNING)
logging.getLogger('chainsyncer.backend.sql').setLevel(logging.WARNING)
logging.getLogger('chainsyncer.filter').setLevel(logging.WARNING)
- logging.getLogger('leveldir.hex').setLevel(level=logging.DEBUG)
- logging.getLogger('leveldir.numeric').setLevel(level=logging.DEBUG)
if args.vv:
logg.setLevel(logging.DEBUG)
@@ -241,6 +241,8 @@ def setup_filter(chain_spec, cache_dir, include_tx_data, include_block_data):
logg.info('using chain spec {} and store {}'.format(chain_spec, store))
RuledFilter.init(store, include_tx_data=include_tx_data, include_block_data=include_block_data)
+ return store
+
def setup_cache_filter(rules_filter=None):
return CacheFilter(rules_filter=rules_filter)
@@ -308,7 +310,7 @@ def main():
args,
)
- setup_filter(
+ store = setup_filter(
chain_spec,
args.cache_dir,
bool(args.store_tx_data),
@@ -357,13 +359,14 @@ def main():
out_filter = OutFilter(chain_spec, rules_filter=address_rules, renderers=renderers_mods)
filters.append(out_filter)
+ cache_rpc = CacheRPC(rpc, store)
i = 0
for syncer in syncers:
logg.info('running syncer index {} {}'.format(i, str(syncer)))
for f in filters:
syncer.add_filter(f)
- r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
+ r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), cache_rpc)
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
i += 1
diff --git a/eth_monitor/store/file.py b/eth_monitor/store/file.py
@@ -54,6 +54,13 @@ class FileStore:
src = json.dumps(tx.src()).encode('utf-8')
self.tx_dir.add(bytes.fromhex(strip_0x(tx.hash)), src)
+ rcpt_src = tx.rcpt_src()
+ logg.debug('rcpt {}'.format(rcpt_src))
+ if rcpt_src != None:
+ rcpt_src = json.dumps(rcpt_src).encode('utf-8')
+ self.rcpt_dir.add(bytes.fromhex(strip_0x(tx.hash)), rcpt_src)
+
+
def put_block(self, block, include_data=False):
hash_bytes = bytes.fromhex(strip_0x(block.hash))
@@ -65,6 +72,37 @@ class FileStore:
self.block_src_dir.add(hash_bytes, src)
+ def get_block_number(self, block_number):
+ fp = self.block_num_dir.to_filepath(block_number)
+ f = open(fp, 'rb')
+ r = f.read()
+ f.close()
+ return self.get_block(r.hex())
+
+
+ def get_block(self, block_hash):
+ fp = self.block_src_dir.to_filepath(block_hash)
+ f = open(fp, 'rb')
+ r = f.read()
+ f.close()
+ return r
+
+
+ def get_tx(self, tx_hash):
+ fp = self.tx_dir.to_filepath(tx_hash)
+ f = open(fp, 'rb')
+ r = f.read()
+ f.close()
+ return r
+
+
+ def get_rcpt(self, tx_hash):
+ fp = self.rcpt_dir.to_filepath(tx_hash)
+ f = open(fp, 'rb')
+ r = f.read()
+ f.close()
+ return r
+
def __init__(self, chain_spec, cache_root=base_dir, address_rules=None):
self.cache_root = os.path.join(
cache_root,
@@ -86,6 +124,10 @@ class FileStore:
self.tx_raw_path = os.path.join(self.cache_dir, 'tx', 'raw')
self.tx_dir = HexDir(self.tx_path, 32, levels=2)
self.tx_raw_dir = HexDir(self.tx_raw_path, 32, levels=2)
+ self.rcpt_path = os.path.join(self.cache_dir, 'rcpt', 'src')
+ self.rcpt_raw_path = os.path.join(self.cache_dir, 'rcpt', 'raw')
+ self.rcpt_dir = HexDir(self.rcpt_path, 32, levels=2)
+ self.rcpt_raw_dir = HexDir(self.rcpt_raw_path, 32, levels=2)
self.address_path = os.path.join(self.cache_dir, 'address')
self.address_dir = HexDir(self.address_path, 20, levels=2)
self.chain_spec = chain_spec
diff --git a/eth_monitor/store/null.py b/eth_monitor/store/null.py
@@ -16,6 +16,22 @@ class NullStore:
pass
+ def get_block_number(self, v):
+ raise FileNotFoundError(v)
+
+
+ def get_block(self, v):
+ raise FileNotFoundError(v)
+
+
+ def get_tx(self, v):
+ raise FileNotFoundError(v)
+
+
+ def get_rcpt(self, v):
+ raise FileNotFoundError(v)
+
+
def __init__(self):
self.chain_dir = '/dev/null'
diff --git a/man/eth-monitor.custom.groff b/man/eth-monitor.custom.groff
@@ -2,7 +2,8 @@
By default, addresses to match against transactions need to be explicitly specified. This behavior can be reversed with the \fB--include-default\fP option. Addresses to match are defined using the \fB--input\fP, \fB--output\fP and \fB--exec\fP options. Addresses specified multiple times will be deduplicated.
.P
Inclusion rules may also be loaded from file by specifying the \fB--includes-file\fP and \fB--excludes-file\fP options. Each file must specify the outputs, inputs and exec addresses as comma separated lists respectively, separated by tabs.
-
+.P
+In the current state of this tool, address matching will affect all parts of the processing; cache, code execution and rendering.
.SH SYNCING
When a sync is initiated, the state of this sync is persisted. This way, previous syncs that did not complete for some reason will be resumed where they left off.
@@ -13,13 +14,17 @@ Syncs can be forced to (re)run for ranges regardless of previous state by using
.SH CACHE
-.P
When syncing, the hash of a block and transaction matching the address criteria will be stored in the cache. The hashes can be used for future data lookups.
.P
If \fB--store-block-data\fP and/or \fB--store-tx-data\fP is set, a copy of the block and/or transaction data will also be stored, respectively.
.SH RENDERING
+Rendering in the context of \fBeth-monitor\fP refers to a formatted output stream that occurs independently of caching and code execution.
+.P
+Filters for rendering may be specified by specifying python modules to the \fB--renderer\fP option. This option may be specified multiple times.
+.P
+Rendering filters will be executed in order, and the first filter to return \fIFalse\fP
.SH DEFINING FILTERS
diff --git a/requirements.txt b/requirements.txt
@@ -1,4 +1,4 @@
-chainlib-eth~=0.0.27
+chainlib-eth~=0.0.28
chainlib~=0.0.23
chainsyncer~=0.1.0
eth-erc20~=0.1.10