commit 84657c9021c8900a8cbd5c3abe63908e30be98b1
parent 8c1f65a6066b19eb0c0d09fb9f37c95eb4168d9d
Author: lash <dev@holbrook.no>
Date: Sun, 23 Jan 2022 17:28:25 +0000
Factor out address filter
Diffstat:
4 files changed, 270 insertions(+), 26 deletions(-)
diff --git a/eth_monitor/filters/cache.py b/eth_monitor/filters/cache.py
@@ -1,18 +1,21 @@
# standard imports
import os
+import logging
# external imports
from chainsyncer.backend.file import chain_dir_for
from leveldir.numeric import NumDir
from leveldir.hex import HexDir
from hexathon import strip_0x
+from chainlib.eth.address import is_same_address
base_dir = '/var/lib'
+logg = logging.getLogger()
class CacheFilter:
- def __init__(self, chain_spec, cache_root=base_dir):
+ def __init__(self, chain_spec, cache_root=base_dir, rules_filter=None):
cache_root = os.path.join(cache_root, 'eth_monitor')
chain_dir = chain_dir_for(cache_root)
self.cache_dir = os.path.join(chain_dir, 'cache')
@@ -22,7 +25,8 @@ class CacheFilter:
self.block_hash_dir = HexDir(block_hash_path, 32, levels=2)
tx_path = os.path.join(self.cache_dir, 'tx')
self.tx_dir = HexDir(tx_path, 32, levels=2)
-
+ self.rules_filter = rules_filter
+
def block_callback(self, block, extra=None):
src = str(block.src()).encode('utf-8')
@@ -32,5 +36,9 @@ class CacheFilter:
def filter(self, conn, block, tx, db_session=None):
+ if self.rules_filter != None:
+ if not self.rules_filter.apply_rules(tx):
+ logg.debug('rule match failed for tx {}'.format(tx.hash))
+ return
src = str(tx.src()).encode('utf-8')
self.tx_dir.add(bytes.fromhex(strip_0x(tx.hash)), src)
diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py
@@ -22,6 +22,7 @@ from chainsyncer.filter import NoopFilter
# local imports
from eth_monitor.chain import EthChainInterface
from eth_monitor.filters.cache import CacheFilter
+from eth_monitor.rules import AddressRules
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -40,9 +41,14 @@ argparser = argparse.ArgumentParser('master eth events monitor')
argparser.add_argument('-p', '--provider', dest='p', default=default_eth_provider, type=str, help='Web3 provider url (http only)')
argparser.add_argument('-c', type=str, default=default_config_dir, help='config file')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string')
-argparser.add_argument('--offset', type=int, default=0, help='Use sequential rpc ids')
+argparser.add_argument('--offset', type=int, default=0, help='Start sync on this block')
argparser.add_argument('--seq', action='store_true', help='Use sequential rpc ids')
argparser.add_argument('--skip-history', action='store_true', dest='skip_history', help='Skip history sync')
+argparser.add_argument('--includes-file', type=str, dest='includes_file', help='Load include rules from file')
+argparser.add_argument('--include-default', action='store_true', help='Include all transactions by default')
+argparser.add_argument('--excludes-file', type=str, dest='excludes_file', help='Load exclude rules from file')
+argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data')
+argparser.add_argument('--single', action='store_true', help='Execute a single sync, regardless of previous states')
argparser.add_argument('-v', action='store_true', help='Be verbose')
argparser.add_argument('-vv', action='store_true', help='Be more verbose')
args = argparser.parse_args(sys.argv[1:])
@@ -61,6 +67,8 @@ args_override = {
config.dict_override(args_override, 'cli')
config.add(args.offset, '_SYNC_OFFSET', True)
config.add(args.skip_history, '_NO_HISTORY', True)
+config.add(args.single, '_SINGLE', True)
+
logg.debug('config loaded:\n{}'.format(config))
chain_spec = ChainSpec.from_chain_str(args.i)
@@ -77,29 +85,80 @@ if os.environ.get('RPC_AUTHENTICATION') == 'basic':
auth = BasicAuth(os.environ['RPC_USERNAME'], os.environ['RPC_PASSWORD'])
rpc = EthHTTPConnection(args.p)
-if __name__ == '__main__':
- o = block_latest()
- r = rpc.do(o)
- block_offset = int(strip_0x(r), 16) + 1
- logg.debug('current block height {}'.format(block_offset))
- syncers = []
- syncer_backends = FileBackend.resume(chain_spec, block_offset, base_dir=state_dir)
+def setup_address_rules(includes_file=None, excludes_file=None, include_default=False):
+
+ rules = AddressRules(include_by_default=include_default)
+
+ if includes_file != None:
+ f = open(includes_file, 'r')
+ logg.debug('reading includes rules from {}'.format(os.path.realpath(includes_file)))
+ while True:
+ r = f.readline()
+ if r == '':
+ break
+ r = r.rstrip()
+ v = r.split(",")
+
+ sender = None
+ recipient = None
+ executable = None
+
+ if v[0] != '':
+ sender = v[0]
+ if v[1] != '':
+ recipient = v[1]
+ if v[2] != '':
+ executable = v[2]
+
+ rules.include(sender=sender, recipient=recipient, executable=executable)
+
+ if excludes_file != None:
+ f = open(includes_file, 'r')
+ logg.debug('reading excludes rules from {}'.format(os.path.realpath(excludes_file)))
+ while True:
+ r = f.readline()
+ if r == '':
+ break
+ r = r.rstrip()
+ v = r.split(",")
+
+ sender = None
+ recipient = None
+ executable = None
+
+ if v[0] != '':
+ sender = v[0]
+ if v[1] != '':
+ recipient = v[1]
+ if v[2] != '':
+ executable = v[2]
+
+ rules.exclude(sender=sender, recipient=recipient, executable=executable)
+
+ return rules
+
+
+def setup_cache_filter(chain_spec, cache_dir, rules_filter=None):
+ cache_dir = os.path.realpath(cache_dir)
+ if cache_dir == None:
+ import tempfile
+ cache_dir = tempfile.mkdtemp()
+ logg.info('using dir {}'.format(cache_dir))
+ cache_filter = CacheFilter(chain_spec, cache_dir, rules_filter=rules_filter)
+
+ return cache_filter
- import tempfile
- tmp_dir = tempfile.mkdtemp()
- logg.info('using dir {}'.format(tmp_dir))
- cache_filter = CacheFilter(chain_spec, tmp_dir)
- filters = [
- cache_filter,
- ]
+def setup_backend_resume(chain_spec, block_offset, state_dir, callback, sync_offset=0, skip_history=False):
+ syncers = []
+ syncer_backends = FileBackend.resume(chain_spec, block_offset, base_dir=state_dir)
if len(syncer_backends) == 0:
initial_block_start = block_offset - 1
if config.get('_SYNC_OFFSET') != None:
initial_block_start = config.get('_SYNC_OFFSET')
initial_block_offset = block_offset
- if config.get('_NO_HISTORY'):
+ if skip_history:
initial_block_start = block_offset
initial_block_offset += 1
syncer_backends.append(FileBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start, base_dir=state_dir))
@@ -108,16 +167,63 @@ if __name__ == '__main__':
for syncer_backend in syncer_backends:
logg.info('resuming sync session {}'.format(syncer_backend))
- chain_interface = EthChainInterface()
for syncer_backend in syncer_backends:
syncers.append(HistorySyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback))
syncer_backend = FileBackend.live(chain_spec, block_offset+1, base_dir=state_dir)
syncers.append(HeadSyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback))
-
+ return syncers
+
+
+def setup_backend_single(chain_spec, block_offset, state_dir, callback, chain_interface, sync_offset=0, skip_history=False):
+ syncer_backend = FileBackend.initial(chain_spec, block_offset, start_block_height=sync_offset, base_dir=state_dir)
+ syncer = HistorySyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback)
+ return [syncer]
+
+
+if __name__ == '__main__':
+ o = block_latest()
+ r = rpc.do(o)
+ block_offset = int(strip_0x(r), 16) + 1
+ logg.debug('current block height {}'.format(block_offset))
+
+
+ address_rules = setup_address_rules(
+ includes_file=args.includes_file,
+ excludes_file=args.excludes_file,
+ include_default=bool(args.include_default),
+ )
+
+ cache_filter = setup_cache_filter(
+ chain_spec,
+ args.cache_dir,
+ rules_filter=address_rules,
+ )
+
+ filters = [
+ cache_filter,
+ ]
+
+ syncer_setup_func = None
+ if config.true('_SINGLE'):
+ syncer_setup_func = setup_backend_single
+ else:
+ syncer_setup_func = setup_backend_resume
+
+ chain_interface = EthChainInterface()
+ syncers = syncer_setup_func(
+ chain_spec,
+ block_offset,
+ state_dir,
+ cache_filter.block_callback,
+ chain_interface,
+ sync_offset=config.get('_SYNC_OFFSET'),
+ skip_history=config.true('_NO_HISTORY'),
+ )
+
i = 0
for syncer in syncers:
- logg.debug('running syncer index {} {}'.format(i, str(syncer)))
+ logg.info('running syncer index {} {}'.format(i, str(syncer)))
for f in filters:
syncer.add_filter(f)
diff --git a/eth_monitor/runnable/sync_thread_range.py b/eth_monitor/runnable/sync_thread_range.py
@@ -0,0 +1,129 @@
+# standard imports
+import sys
+import signal
+import argparse
+import confini
+import logging
+import os
+
+# external imports
+from chainlib.chain import ChainSpec
+from chainlib.eth.connection import EthHTTPConnection
+from chainlib.eth.block import block_latest
+from hexathon import (
+ strip_0x,
+ add_0x,
+ )
+from chainsyncer.driver.head import HeadSyncer
+#from chainsyncer.driver.history import HistorySyncer
+from chainsyncer.driver.threadrange import ThreadPoolRangeHistorySyncer
+from chainsyncer.backend.file import FileBackend
+from chainsyncer.filter import NoopFilter
+
+# local imports
+from eth_monitor.chain import EthChainInterface
+from eth_monitor.filters.cache import CacheFilter
+
+logging.basicConfig(level=logging.WARNING)
+logg = logging.getLogger()
+#logging.getLogger('leveldir.hex').setLevel(level=logging.DEBUG)
+#logging.getLogger('leveldir.numeric').setLevel(level=logging.DEBUG)
+
+default_eth_provider = os.environ.get('RPC_PROVIDER')
+if default_eth_provider == None:
+ default_eth_provider = os.environ.get('ETH_PROVIDER', 'http://localhost:8545')
+
+script_dir = os.path.realpath(os.path.dirname(__file__))
+exec_dir = os.path.realpath(os.getcwd())
+default_config_dir = os.environ.get('CONFINI_DIR', os.path.join(exec_dir, 'config'))
+
+argparser = argparse.ArgumentParser('master eth events monitor')
+argparser.add_argument('-p', '--provider', dest='p', default=default_eth_provider, type=str, help='Web3 provider url (http only)')
+argparser.add_argument('-c', type=str, default=default_config_dir, help='config file')
+argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string')
+argparser.add_argument('--offset', type=int, default=0, help='Use sequential rpc ids')
+argparser.add_argument('--seq', action='store_true', help='Use sequential rpc ids')
+argparser.add_argument('--skip-history', action='store_true', dest='skip_history', help='Skip history sync')
+argparser.add_argument('-v', action='store_true', help='Be verbose')
+argparser.add_argument('-vv', action='store_true', help='Be more verbose')
+args = argparser.parse_args(sys.argv[1:])
+
+if args.vv:
+ logg.setLevel(logging.DEBUG)
+elif args.v:
+ logg.setLevel(logging.INFO)
+
+config_dir = args.c
+config = confini.Config(config_dir, os.environ.get('CONFINI_ENV_PREFIX'))
+config.process()
+args_override = {
+ 'CHAIN_SPEC': getattr(args, 'i'),
+ }
+config.dict_override(args_override, 'cli')
+config.add(args.offset, '_SYNC_OFFSET', True)
+config.add(args.skip_history, '_NO_HISTORY', True)
+logg.debug('config loaded:\n{}'.format(config))
+
+chain_spec = ChainSpec.from_chain_str(args.i)
+
+state_dir = os.path.join(exec_dir, 'state')
+
+rpc_id_generator = None
+if args.seq:
+ rpc_id_generator = IntSequenceGenerator()
+
+auth = None
+if os.environ.get('RPC_AUTHENTICATION') == 'basic':
+ from chainlib.auth import BasicAuth
+ auth = BasicAuth(os.environ['RPC_USERNAME'], os.environ['RPC_PASSWORD'])
+rpc = EthHTTPConnection(args.p)
+
+if __name__ == '__main__':
+ o = block_latest()
+ r = rpc.do(o)
+ block_offset = int(strip_0x(r), 16) + 1
+ logg.debug('current block height {}'.format(block_offset))
+ syncers = []
+
+ syncer_backends = FileBackend.resume(chain_spec, block_offset, base_dir=state_dir)
+
+ import tempfile
+ tmp_dir = tempfile.mkdtemp()
+ logg.info('using dir {}'.format(tmp_dir))
+ cache_filter = CacheFilter(chain_spec, tmp_dir)
+ filters = [
+ cache_filter,
+ ]
+
+ if len(syncer_backends) == 0:
+ initial_block_start = block_offset - 1
+ if config.get('_SYNC_OFFSET') != None:
+ initial_block_start = config.get('_SYNC_OFFSET')
+ initial_block_offset = block_offset
+ if config.get('_NO_HISTORY'):
+ initial_block_start = block_offset
+ initial_block_offset += 1
+ syncer_backends.append(FileBackend.initial(chain_spec, initial_block_offset, start_block_height=initial_block_start, base_dir=state_dir))
+ logg.info('found no backends to resume, adding initial sync from history start {} end {}'.format(initial_block_start, initial_block_offset))
+ else:
+ for syncer_backend in syncer_backends:
+ logg.info('resuming sync session {}'.format(syncer_backend))
+
+ chain_interface = EthChainInterface()
+ for syncer_backend in syncer_backends:
+ #syncers.append(HistorySyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback))
+ syncers.append(ThreadPoolRangeHistorySyncer(8, syncer_backend, chain_interface, block_callback=cache_filter.block_callback))
+
+ syncer_backend = FileBackend.live(chain_spec, block_offset+1, base_dir=state_dir)
+ syncers.append(HeadSyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback))
+
+ i = 0
+ for syncer in syncers:
+ logg.debug('running syncer index {} {}'.format(i, str(syncer)))
+ for f in filters:
+ syncer.add_filter(f)
+
+ r = syncer.loop(int(config.get('SYNCER_LOOP_INTERVAL')), rpc)
+ sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
+
+ i += 1
diff --git a/requirements.txt b/requirements.txt
@@ -1,5 +1,6 @@
-chainlib>=0.0.4a1,<=0.0.4
-chainsyncer>=0.0.3a1, <=0.0.3
-crypto-dev-signer>=0.4.14a6,<0.5
-eth_erc20~=0.0.10a1
-leveldir~=0.0.2
+chainlib-eth~=0.0.19
+chainsyncer>=0.0.7a1, <=0.0.8
+chainsyncer~=0.0.7
+#funga-eth>=0.5.1a1,<0.6.0
+eth-erc20~=0.1.6
+leveldir~=0.1.0