commit 2999695bd4792719f64ca0f7f1b97dad6550450b
parent a7218e96013d85f4e3f21f17bd25fa4bf704fd2b
Author: lash <dev@holbrook.no>
Date: Wed, 30 Mar 2022 08:11:46 +0000
Implement sync runnable on shep chainsyncer
Diffstat:
5 files changed, 53 insertions(+), 91 deletions(-)
diff --git a/eth_monitor/filters/base.py b/eth_monitor/filters/base.py
@@ -5,11 +5,12 @@ import json
# external imports
from hexathon import strip_0x
+from chainsyncer.filter import SyncFilter
logg = logging.getLogger(__name__)
-class RuledFilter:
+class RuledFilter(SyncFilter):
def __init__(self, rules_filter=None):
if self.store.chain_dir == None:
@@ -34,5 +35,5 @@ class RuledFilter:
if self.rules_filter != None:
if not self.rules_filter.apply_rules(tx):
logg.debug('rule match failed for tx {}'.format(tx.hash))
- return False
- return True
+ return True
+ return False
diff --git a/eth_monitor/filters/cache.py b/eth_monitor/filters/cache.py
@@ -15,8 +15,8 @@ class Filter(RuledFilter):
def filter(self, conn, block, tx, db_session=None):
r = super(Filter, self).filter(conn, block, tx, db_session=db_session)
- if r == False:
+ if r == True:
return True
self.ruled_filter(conn, block, tx, db_session=db_session)
- return True
+ return False
diff --git a/eth_monitor/filters/out.py b/eth_monitor/filters/out.py
@@ -45,7 +45,7 @@ class OutFilter(RuledFilter):
def filter(self, conn, block, tx, db_session=None):
r = super(OutFilter, self).filter(conn, block, tx, db_session=db_session)
- if r == False:
+ if r == True:
return True
for renderer in self.renderers:
@@ -65,3 +65,4 @@ class OutFilter(RuledFilter):
self.w.write(s + '\n')
self.c += 1
+ return False
diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py
@@ -6,6 +6,7 @@ import confini
import logging
import os
import importlib
+import uuid
# external imports
from chainlib.chain import ChainSpec
@@ -15,10 +16,10 @@ from hexathon import (
strip_0x,
add_0x,
)
-from chainsyncer.driver.head import HeadSyncer
-from chainsyncer.driver.history import HistorySyncer
-from chainsyncer.backend.file import FileBackend
-from chainsyncer.filter import NoopFilter
+from chainsyncer.store.fs import SyncFsStore
+from chainsyncer.driver.chain_interface import ChainInterfaceDriver
+from chainsyncer.error import SyncDone
+
from eth_cache.rpc import CacheRPC
from eth_cache.store.file import FileStore
@@ -71,8 +72,10 @@ argparser.add_argument('--address-file', type=str, dest='excludes_file', help='L
argparser.add_argument('--renderer', type=str, action='append', default=[], help='Python modules to dynamically load for rendering of transaction output')
argparser.add_argument('--filter', type=str, action='append', help='Add python module filter path')
argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data')
+argparser.add_argument('--state-dir', dest='state_dir', default=exec_dir, type=str, help='Directory to store sync state')
argparser.add_argument('--fresh', action='store_true', help='Do not read block and tx data from cache, even if available')
argparser.add_argument('--single', action='store_true', help='Execute a single sync, regardless of previous states')
+argparser.add_argument('--session-id', dest='session_id', type=str, help='Use state from specified session id')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', action='store_true', help='Be more verbose')
argparser.add_argument('-vvv', action='store_true', help='Be incredibly verbose')
@@ -106,14 +109,21 @@ config.add(args.skip_history, '_NO_HISTORY', True)
config.add(args.single, '_SINGLE', True)
config.add(args.head, '_HEAD', True)
config.add(args.keep_alive, '_KEEP_ALIVE', True)
+config.add(os.path.realpath(args.state_dir), '_STATE_DIR', True)
+config.add(args.cache_dir, '_CACHE_DIR', True)
+config.add(args.session_id, '_SESSION_ID', True)
override(config, 'renderer', env=os.environ, args=args)
override(config, 'filter', env=os.environ, args=args)
+
+if config.get('_SESSION_ID') == None:
+ if not config.get('_SINGLE'):
+ #config.add('_SESSION_ID', str(uuid.uuid4()), True)
+ #else:
+ config.add('default', '_SESSION_ID', True)
logg.debug('loaded config:\n{}'.format(config))
chain_spec = ChainSpec.from_chain_str(args.i)
-state_dir = os.path.join(exec_dir, 'state')
-
rpc_id_generator = None
if args.seq:
rpc_id_generator = IntSequenceGenerator()
@@ -238,76 +248,41 @@ def setup_cache_filter(rules_filter=None):
return CacheFilter(rules_filter=rules_filter)
-def setup_backend_resume(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False, keep_alive=False):
- syncers = []
- syncer_backends = FileBackend.resume(chain_spec, block_offset, base_dir=state_dir)
- if len(syncer_backends) == 0:
- initial_block_start = block_offset - 1
- if config.get('_SYNC_OFFSET') != None:
- initial_block_start = config.get('_SYNC_OFFSET')
- initial_block_offset = block_offset
- if skip_history:
- initial_block_start = block_offset
- initial_block_offset += 1
- syncer_backends.append(FileBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start, base_dir=state_dir))
- logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset))
- else:
- for syncer_backend in syncer_backends:
- logg.info('resuming sync session {}'.format(syncer_backend))
-
- for syncer_backend in syncer_backends:
- syncers.append(HistorySyncer(syncer_backend, chain_interface, block_callback=callback)) #RuledFilter.block_callback))
-
- syncer_backend = FileBackend.live(chain_spec, block_offset+1, base_dir=state_dir)
-
- if len(syncers) == 0 or keep_alive:
- head_syncer_backend = FileBackend.live(chain_spec, block_offset, base_dir=state_dir)
- syncers.append(HeadSyncer(head_syncer_backend, chain_interface, block_callback=callback))
+def pre_callback():
+ logg.debug('starting sync loop iteration')
- return syncers
+def post_callback():
+ logg.debug('ending sync loop iteration')
-def setup_backend_single(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False, keep_alive=False):
- logg.info('block limit {} offset {} syncoffset {}'.format(block_limit, block_offset, sync_offset))
- syncer_backend = FileBackend.initial(chain_spec, block_limit, start_block_height=sync_offset, base_dir=state_dir)
- syncers = []
- syncer = HistorySyncer(syncer_backend, chain_interface, block_callback=callback)
- syncers.append(syncer)
- if keep_alive:
- logg.info('i have keep alive')
- head_syncer_backend = FileBackend.live(chain_spec, block_offset, base_dir=state_dir)
- syncers.append(HeadSyncer(head_syncer_backend, chain_interface, block_callback=callback))
- return syncers
-
-def setup_backend_head(chain_spec, block_offset, block_limit, state_dir, callback, chain_interface, sync_offset=0, skip_history=False, keep_alive=True):
- syncer_backend = FileBackend.live(chain_spec, block_offset, base_dir=state_dir)
- syncer = HeadSyncer(syncer_backend, chain_interface, block_callback=callback)
- return [syncer]
+def block_callback(block, tx):
+ logg.debug('processing block {}'.format(block))
def main():
+ o = block_latest()
+ r = rpc.do(o)
+ block_offset = int(strip_0x(r), 16) + 1
+ logg.info('network block height is {}'.format(block_offset))
+
keep_alive = False
session_block_offset = 0
+ block_limit = 0
if args.head:
- session_block_offset = -1
+ session_block_offset = block_offset
+ block_limit = -1
keep_alive = True
else:
session_block_offset = args.offset
- o = block_latest()
- r = rpc.do(o)
- block_offset = int(strip_0x(r), 16) + 1
- logg.info('network block height is {}'.format(block_offset))
-
- block_limit = 0
if args.until > 0:
if not args.head and args.until <= block_offset:
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(block_offset, args.until))
block_limit = args.until
elif config.true('_KEEP_ALIVE'):
keep_alive=True
- block_limit = block_offset
+ block_limit = -1
if session_block_offset == -1:
session_block_offset = block_offset
@@ -328,7 +303,7 @@ def main():
store = setup_filter(
chain_spec,
- args.cache_dir,
+ config.get('_CACHE_DIR'),
bool(args.store_tx_data),
bool(args.store_block_data),
)
@@ -353,44 +328,29 @@ def main():
renderers_mods.append(m)
logg.info('using renderer module {}'.format(renderer))
- syncer_setup_func = None
- if config.true('_HEAD'):
- syncer_setup_func = setup_backend_head
- elif config.true('_SINGLE'):
- syncer_setup_func = setup_backend_single
- else:
- syncer_setup_func = setup_backend_resume
-
chain_interface = EthChainInterface()
- syncers = syncer_setup_func(
- chain_spec,
- block_offset,
- block_limit,
- state_dir,
- cache_filter.block_callback,
- chain_interface,
- sync_offset=config.get('_SYNC_OFFSET'),
- skip_history=config.true('_NO_HISTORY'),
- keep_alive=keep_alive,
- )
out_filter = OutFilter(chain_spec, rules_filter=address_rules, renderers=renderers_mods)
filters.append(out_filter)
+
+ sync_store = SyncFsStore(config.get('_STATE_DIR'), session_id=config.get('_SESSION_ID'))
+ logg.info('session is {}'.format(sync_store.session_id))
+
+ for fltr in filters:
+ sync_store.register(fltr)
+ drv = ChainInterfaceDriver(sync_store, chain_interface, offset=session_block_offset, target=block_limit, pre_callback=pre_callback, post_callback=post_callback, block_callback=block_callback)
use_rpc = rpc
if not args.fresh:
use_rpc = CacheRPC(rpc, store)
i = 0
- for syncer in syncers:
- logg.info('running syncer index {} {}'.format(i, str(syncer)))
- for f in filters:
- syncer.add_filter(f)
-
- r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), use_rpc)
- sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
+ try:
+ r = drv.run(use_rpc)
+ except SyncDone as e:
+ sys.stderr.write("sync {} done at block {}\n".format(drv, e))
- i += 1
+ i += 1
if __name__ == '__main__':
diff --git a/setup.cfg b/setup.cfg
@@ -1,6 +1,6 @@
[metadata]
name = eth-monitor
-version = 0.2.1
+version = 0.3.0
description = Monitor and cache transactions using match filters
author = Louis Holbrook
author_email = dev@holbrook.no