commit 8bfb91d7db5945427521ad53b109962f6eb41293
parent 84657c9021c8900a8cbd5c3abe63908e30be98b1
Author: lash <dev@holbrook.no>
Date: Sun, 23 Jan 2022 17:49:04 +0000
Make backend properties static across filters
Diffstat:
5 files changed, 113 insertions(+), 29 deletions(-)
diff --git a/eth_monitor/filters/__init__.py b/eth_monitor/filters/__init__.py
@@ -0,0 +1 @@
+from .base import *
diff --git a/eth_monitor/filters/base.py b/eth_monitor/filters/base.py
@@ -0,0 +1,54 @@
+# standard imports
+import os
+import logging
+
+# external imports
+from chainsyncer.backend.file import chain_dir_for
+from leveldir.numeric import NumDir
+from leveldir.hex import HexDir
+
+logg = logging.getLogger(__name__)
+
+base_dir = '/var/lib'
+
+
+class RuledFilter:
+
+ cache_root = os.path.join(base_dir, 'eth_monitor')
+ chain_dir = None
+ cache_dir = None
+ block_num_path = None
+ block_num_dir = None
+ block_hash_path = None
+ block_hash_dir = None
+ tx_path = None
+ tx_dir = None
+
+
+ def __init__(self, rules_filter=None):
+ if self.chain_dir == None:
+ raise RuntimeError('filter must be initialized. call RuledFilter.init() first')
+ self.rules_filter = rules_filter
+
+
+ @staticmethod
+ def init(chain_spec, cache_root=None, rules_filter=None):
+ if cache_root != None:
+ RuledFilter.cache_root = os.path.join(cache_root, 'eth_monitor')
+ RuledFilter.chain_dir = chain_dir_for(RuledFilter.cache_root)
+ RuledFilter.cache_dir = os.path.join(RuledFilter.chain_dir, 'cache')
+ RuledFilter.block_num_path = os.path.join(RuledFilter.cache_dir, 'block', 'num')
+ RuledFilter.block_num_dir = NumDir(RuledFilter.block_num_path, [100000, 1000])
+ RuledFilter.block_hash_path = os.path.join(RuledFilter.cache_dir, 'block', 'hash')
+ RuledFilter.block_hash_dir = HexDir(RuledFilter.block_hash_path, 32, levels=2)
+ RuledFilter.tx_path = os.path.join(RuledFilter.cache_dir, 'tx')
+ RuledFilter.tx_dir = HexDir(RuledFilter.tx_path, 32, levels=2)
+
+
+ def filter(self, conn, block, tx, db_session=None):
+ if self.rules_filter != None:
+ if not self.rules_filter.apply_rules(tx):
+ logg.debug('rule match failed for tx {}'.format(tx.hash))
+ return
+ logg.info('applying filter {}'.format(self))
+ self.ruled_filter(conn, block, tx, db_session=db_session)
diff --git a/eth_monitor/filters/cache.py b/eth_monitor/filters/cache.py
@@ -3,30 +3,15 @@ import os
import logging
# external imports
-from chainsyncer.backend.file import chain_dir_for
-from leveldir.numeric import NumDir
-from leveldir.hex import HexDir
from hexathon import strip_0x
-from chainlib.eth.address import is_same_address
-base_dir = '/var/lib'
+# local imports
+from eth_monitor.filters import RuledFilter
-logg = logging.getLogger()
+logg = logging.getLogger(__name__)
-class CacheFilter:
- def __init__(self, chain_spec, cache_root=base_dir, rules_filter=None):
- cache_root = os.path.join(cache_root, 'eth_monitor')
- chain_dir = chain_dir_for(cache_root)
- self.cache_dir = os.path.join(chain_dir, 'cache')
- block_num_path = os.path.join(self.cache_dir, 'block', 'num')
- self.block_num_dir = NumDir(block_num_path, [100000, 1000])
- block_hash_path = os.path.join(self.cache_dir, 'block', 'hash')
- self.block_hash_dir = HexDir(block_hash_path, 32, levels=2)
- tx_path = os.path.join(self.cache_dir, 'tx')
- self.tx_dir = HexDir(tx_path, 32, levels=2)
- self.rules_filter = rules_filter
-
+class CacheFilter(RuledFilter):
def block_callback(self, block, extra=None):
src = str(block.src()).encode('utf-8')
@@ -35,10 +20,6 @@ class CacheFilter:
self.block_num_dir.add(block.number, hash_bytes)
- def filter(self, conn, block, tx, db_session=None):
- if self.rules_filter != None:
- if not self.rules_filter.apply_rules(tx):
- logg.debug('rule match failed for tx {}'.format(tx.hash))
- return
+ def ruled_filter(self, conn, block, tx, db_session=None):
src = str(tx.src()).encode('utf-8')
self.tx_dir.add(bytes.fromhex(strip_0x(tx.hash)), src)
diff --git a/eth_monitor/rules.py b/eth_monitor/rules.py
@@ -0,0 +1,42 @@
+class AddressRules:
+
+ def __init__(self, include_by_default=False):
+ self.excludes = []
+ self.includes = []
+ self.include_by_default = include_by_default
+
+
+ def exclude(self, sender=None, recipient=None, executable=None):
+ self.excludes.append((sender, recipient, executable,))
+ logg.info('cache filter added EXCLUDE rule sender {} recipient {} executable {}'.format(sender, recipient, executable))
+
+
+ def include(self, sender=None, recipient=None, executable=None):
+ self.includes.append((sender, recipient, executable,))
+ logg.info('cache filter added INCLUDE rule sender {} recipient {} executable {}'.format(sender, recipient, executable))
+
+
+ def apply_rules(self, tx):
+ v = False
+
+ for rule in self.includes:
+ if rule[0] != None and is_same_address(tx.outputs[0], rule[0]):
+ logg.debug('tx {} rule INCLUDE match in SENDER {}'.format(tx.hash, tx.outputs[0]))
+ v = True
+ elif rule[1] != None and is_same_address(tx.inputs[0], rule[1]):
+ logg.debug('tx {} rule INCLUDE match in RECIPIENT {}'.format(tx.hash, tx.inputs[0]))
+ v = True
+ elif rule[2] != None and is_same_address(tx.inputs[0], rule[2]):
+ logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx.hash, tx.inputs[0]))
+ v = True
+ for rule in self.excludes:
+ if rule[0] != None and is_same_address(tx.outputs[0], rule[0]):
+ logg.debug('tx {} rule INCLUDE match in SENDER {}'.format(tx.hash, tx.outputs[0]))
+ v = False
+ elif rule[1] != None and is_same_address(tx.inputs[0], rule[1]):
+ logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx.hash, tx.inputs[0]))
+ v = False
+ elif rule[2] != None and is_same_address(tx.inputs[0], rule[2]):
+ logg.debug('tx {} rule INCLUDE match in ExECUTABLE {}'.format(tx.hash, tx.inputs[0]))
+ v = False
+ return v
diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py
@@ -23,6 +23,7 @@ from chainsyncer.filter import NoopFilter
from eth_monitor.chain import EthChainInterface
from eth_monitor.filters.cache import CacheFilter
from eth_monitor.rules import AddressRules
+from eth_monitor.filters import RuledFilter
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
@@ -139,15 +140,17 @@ def setup_address_rules(includes_file=None, excludes_file=None, include_default=
return rules
-def setup_cache_filter(chain_spec, cache_dir, rules_filter=None):
+def setup_filter(chain_spec, cache_dir):
cache_dir = os.path.realpath(cache_dir)
if cache_dir == None:
import tempfile
cache_dir = tempfile.mkdtemp()
- logg.info('using dir {}'.format(cache_dir))
- cache_filter = CacheFilter(chain_spec, cache_dir, rules_filter=rules_filter)
+ logg.info('using chain spec {} and dir {}'.format(chain_spec, cache_dir))
+ RuledFilter.init(chain_spec, cache_dir)
- return cache_filter
+
+def setup_cache_filter(rules_filter=None):
+ return CacheFilter(rules_filter=rules_filter)
def setup_backend_resume(chain_spec, block_offset, state_dir, callback, sync_offset=0, skip_history=False):
@@ -194,9 +197,12 @@ if __name__ == '__main__':
include_default=bool(args.include_default),
)
- cache_filter = setup_cache_filter(
+ setup_filter(
chain_spec,
args.cache_dir,
+ )
+
+ cache_filter = setup_cache_filter(
rules_filter=address_rules,
)