commit 8c1f65a6066b19eb0c0d09fb9f37c95eb4168d9d
parent 41d38d4eb8b4ab89f56695d5c1732caadf0c7745
Author: nolash <dev@holbrook.no>
Date: Sun, 27 Jun 2021 11:01:31 +0200
Add cache filter with leveldir backend
Diffstat:
3 files changed, 52 insertions(+), 13 deletions(-)
diff --git a/eth_monitor/filters/cache.py b/eth_monitor/filters/cache.py
@@ -0,0 +1,36 @@
+# standard imports
+import os
+
+# external imports
+from chainsyncer.backend.file import chain_dir_for
+from leveldir.numeric import NumDir
+from leveldir.hex import HexDir
+from hexathon import strip_0x
+
+base_dir = '/var/lib'
+
+
+class CacheFilter:
+
+ def __init__(self, chain_spec, cache_root=base_dir):
+ cache_root = os.path.join(cache_root, 'eth_monitor')
+ chain_dir = chain_dir_for(cache_root)
+ self.cache_dir = os.path.join(chain_dir, 'cache')
+ block_num_path = os.path.join(self.cache_dir, 'block', 'num')
+ self.block_num_dir = NumDir(block_num_path, [100000, 1000])
+ block_hash_path = os.path.join(self.cache_dir, 'block', 'hash')
+ self.block_hash_dir = HexDir(block_hash_path, 32, levels=2)
+ tx_path = os.path.join(self.cache_dir, 'tx')
+ self.tx_dir = HexDir(tx_path, 32, levels=2)
+
+
+ def block_callback(self, block, extra=None):
+ src = str(block.src()).encode('utf-8')
+ hash_bytes = bytes.fromhex(strip_0x(block.hash))
+ self.block_hash_dir.add(hash_bytes, src)
+ self.block_num_dir.add(block.number, hash_bytes)
+
+
+ def filter(self, conn, block, tx, db_session=None):
+ src = str(tx.src()).encode('utf-8')
+ self.tx_dir.add(bytes.fromhex(strip_0x(tx.hash)), src)
diff --git a/eth_monitor/runnable/sync.py b/eth_monitor/runnable/sync.py
@@ -21,9 +21,12 @@ from chainsyncer.filter import NoopFilter
# local imports
from eth_monitor.chain import EthChainInterface
+from eth_monitor.filters.cache import CacheFilter
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
+#logging.getLogger('leveldir.hex').setLevel(level=logging.DEBUG)
+#logging.getLogger('leveldir.numeric').setLevel(level=logging.DEBUG)
default_eth_provider = os.environ.get('RPC_PROVIDER')
if default_eth_provider == None:
@@ -83,6 +86,14 @@ if __name__ == '__main__':
syncer_backends = FileBackend.resume(chain_spec, block_offset, base_dir=state_dir)
+ import tempfile
+ tmp_dir = tempfile.mkdtemp()
+ logg.info('using dir {}'.format(tmp_dir))
+ cache_filter = CacheFilter(chain_spec, tmp_dir)
+ filters = [
+ cache_filter,
+ ]
+
if len(syncer_backends) == 0:
initial_block_start = block_offset - 1
if config.get('_SYNC_OFFSET') != None:
@@ -99,14 +110,11 @@ if __name__ == '__main__':
chain_interface = EthChainInterface()
for syncer_backend in syncer_backends:
- syncers.append(HistorySyncer(syncer_backend, chain_interface))
+ syncers.append(HistorySyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback))
- #syncer_backend = FileBackend.live(chain_spec, block_offset+1, base_dir=state_dir)
- #syncers.append(HeadSyncer(syncer_backend, chain_interface))
-
- filters = [
- NoopFilter(),
- ]
+ syncer_backend = FileBackend.live(chain_spec, block_offset+1, base_dir=state_dir)
+ syncers.append(HeadSyncer(syncer_backend, chain_interface, block_callback=cache_filter.block_callback))
+
i = 0
for syncer in syncers:
logg.debug('running syncer index {} {}'.format(i, str(syncer)))
@@ -117,9 +125,3 @@ if __name__ == '__main__':
sys.stderr.write("sync {} done at block {}\n".format(syncer, r))
i += 1
-
-
-# if len(sys.argv) > 1:
-# block_number = offset
-# sys.stderr.write('starting on block {}\n'.format(block_number))
-# backend.set(block_number, 0)
diff --git a/requirements.txt b/requirements.txt
@@ -2,3 +2,4 @@ chainlib>=0.0.4a1,<=0.0.4
chainsyncer>=0.0.3a1, <=0.0.3
crypto-dev-signer>=0.4.14a6,<0.5
eth_erc20~=0.0.10a1
+leveldir~=0.0.2