commit 819b07f1730a4c1e631635887811248aeffea02b
Author: nolash <dev@holbrook.no>
Date: Sun, 13 Jun 2021 11:09:48 +0200
Initial commit
Diffstat:
4 files changed, 234 insertions(+), 0 deletions(-)
diff --git a/eth_cache/account.py b/eth_cache/account.py
@@ -0,0 +1,45 @@
+# external imports
+from hexathon import strip_0x
+
+
+class AccountRegistry:
+
+ def __init__(self):
+ self.senders = {}
+ self.recipients = {}
+
+
+ def __normalize_address(self, address):
+ return bytes.fromhex(strip_0x(address))
+
+ def add(self, address, label):
+ self.add_sender(address, label)
+ self.add_recipient(address, label)
+
+
+ def add_sender(self, address, label):
+ a = self.__normalize_address(address)
+ self.senders[a] = label
+
+
+ def add_recipient(self, address, label):
+ a = self.__normalize_address(address)
+ self.recipients[a] = label
+
+
+ def have(self, address):
+ if self.get_sender(address) != None:
+ return True
+ if self.get_recipient(address) != None:
+ return True
+ return False
+
+
+ def get_sender(self, address):
+ a = self.__normalize_address(address)
+ return self.senders.get(a)
+
+
+ def get_recipient(self, address):
+ a = self.__normalize_address(address)
+ return self.recipients.get(a)
diff --git a/eth_cache/filter.py b/eth_cache/filter.py
@@ -0,0 +1,7 @@
+class GasFilter:
+
+ def __init__(self, store):
+
+
+ def filter(self, conn, block, tx, session):
+ self.store.put()
diff --git a/eth_cache/store.py b/eth_cache/store.py
@@ -0,0 +1,65 @@
+# standard imports
+import logging
+import os
+
+# external imports
+from hexdir import HexDir
+from chainlib.eth.tx import pack
+from hexathon import strip_0x
+
+logg = logging.getLogger(__name__)
+
+
+class PointerHexDir(HexDir):
+
+ def __init__(self, root_path, key_length, levels=2, prefix_length=0):
+ super(PointerHexDir, self).__init__(root_path, key_length, levels, prefix_length)
+ self.pointers = {}
+
+
+ def register_pointer(self, label, dir_name=None):
+ if dir_name == None:
+ dir_name = label
+ pointer_dir = os.path.join(self.path, dir_name)
+ os.makedirs(pointer_dir, exist_ok=True)
+
+ label_file = os.path.join(pointer_dir, '.label')
+ try:
+ os.stat(label_file)
+ except FileNotFoundError:
+ f = open(label_file, 'w')
+ f.write(label)
+ f.close()
+
+ self.pointers[label] = pointer_dir
+
+
+ def add_pointer(self, pointer, pointer_relpath, destination_path):
+ if isinstance(pointer_relpath, list):
+ link_path = os.path.join(self.pointers[pointer], *pointer_relpath)
+ else:
+ link_path = os.path.join(self.pointers[pointer], pointer_relpath)
+ os.makedirs(os.path.dirname(link_path), exist_ok=True)
+ os.symlink(destination_path, link_path)
+ logg.debug('added link {} -> {}'.format(link_path, destination_path))
+
+
+ def add(self, key, content, prefix=b'', pointers={}):
+ (c, entry_path) = super(PointerHexDir, self).add(key, content, prefix=prefix)
+ for k in pointers.keys():
+ self.add_pointer(k, pointers[k], entry_path)
+
+
+class TxFileStore:
+
+ def __init__(self, chain_spec, backend):
+ self.backend = backend
+ self.chain_spec = chain_spec
+
+
+ def put(self, block, tx, addresses, attrs={}):
+ for address in addresses:
+ tx_src = tx.as_dict()
+ tx_raw = pack(tx_src, self.chain_spec)
+ filename = '{}_{}_{}'.format(block.number, tx.index, strip_0x(tx.hash))
+ self.backend.add(bytes.fromhex(tx.hash), tx_raw, pointers={'address': [strip_0x(address),filename]})
diff --git a/examples/dump.py b/examples/dump.py
@@ -0,0 +1,117 @@
+# standard imports
+import tempfile
+import os
+import logging
+import sys
+
+# external imports
+from chainlib.chain import ChainSpec
+from chainlib.eth.connection import EthHTTPConnection
+from chainlib.eth.block import (
+ block_by_number,
+ block_by_hash,
+ block_latest,
+ Block,
+ )
+from chainlib.eth.tx import (
+ Tx,
+ transaction,
+ receipt,
+ )
+from chainlib.interface import ChainInterface
+from chainsyncer.backend.memory import MemBackend
+from chainsyncer.driver.thread import ThreadedHistorySyncer
+
+# local imports
+from eth_cache.account import AccountRegistry
+from eth_cache.store import TxFileStore
+from eth_cache.store import PointerHexDir
+
+logging.basicConfig(level=logging.INFO)
+logg = logging.getLogger()
+logging.getLogger('eth_cache.store').setLevel(logging.DEBUG)
+logging.getLogger('chainsyncer.driver.thread').setLevel(logging.DEBUG)
+#logging.getLogger('chainsyncer.backend.memory').setLevel(logging.DEBUG)
+
+root_dir = tempfile.mkdtemp(dir=os.path.join('/tmp/ethsync'))
+data_dir = os.path.join(root_dir, 'store')
+logg.info('using data dir {}'.format(data_dir))
+
+chain_interface = ChainInterface()
+chain_interface.set('block_by_number', block_by_number)
+chain_interface.set('block_by_hash', block_by_hash)
+chain_interface.set('block_latest', block_latest)
+chain_interface.set('block_from_src', Block.from_src)
+chain_interface.set('tx_from_src', Tx.from_src)
+chain_interface.set('tx_by_hash', transaction)
+chain_interface.set('tx_receipt', receipt)
+chain_interface.set('src_normalize', Tx.src_normalize)
+
+chain_spec = ChainSpec('evm', 'ethereum', 1)
+backend = PointerHexDir(data_dir, 32)
+backend.register_pointer('address')
+store = TxFileStore(chain_spec, backend)
+
+rpc = EthHTTPConnection('http://localhost:8545')
+
+#start = 8534365
+start = 12423900
+
+o = block_latest()
+r = rpc.do(o)
+stop = int(r, 16)
+stop = start + 200
+
+syncer_backend = MemBackend(chain_spec, None, target_block=stop)
+syncer_backend.set(start, 0)
+
+#o = block_by_number(12423955, include_tx=False)
+#r = rpc.do(o)
+##o = block_by_hash(r)
+##r = rpc.do(o)
+#block = Block(r)
+#
+#tx_hash = block.txs[308]
+#logg.debug('tx_ahsh {}'.format(tx_hash))
+#o = transaction(tx_hash)
+#tx_src = rpc.do(o)
+#o = receipt(tx_hash)
+#rcpt = rpc.do(o)
+#tx = Tx(tx_src, block=block)
+
+account_registry = AccountRegistry()
+account_registry.add('0x6bd8cb96bbc58a73d5360301b7791457bc93da24', 'money')
+
+class StoreFilter:
+
+ def __init__(self, store, registry):
+ self.registry = registry
+ self.store = store
+
+
+ def filter(self, conn, block, tx, session=None):
+ addresses = []
+ if account_registry.have(tx.inputs[0]):
+ addresses.append(tx.inputs[0])
+ if account_registry.have(tx.outputs[0]):
+ addresses.append(tx.outputs[0])
+ store.put(block, tx, addresses)
+
+
+class MonitorFilter:
+
+ def __init__(self, name='sync'):
+ self.name = name
+
+
+ def filter(self, rpc, block, tx, session=None):
+ s = '{} sync block {} tx {}/{}'.format(self.name, block.number, tx.index, len(block.txs))
+ sys.stdout.write('{:<100s}\r'.format(s))
+
+
+fltr = StoreFilter(store, account_registry)
+
+syncer = ThreadedHistorySyncer(10, syncer_backend, chain_interface)
+syncer.add_filter(MonitorFilter())
+syncer.add_filter(fltr)
+syncer.loop(0, rpc)