chainsyncer

Blockchain syncer driver
Log | Files | Refs | LICENSE

commit 186f28a020862e461459b2504b54711ffb8a492d
parent d5cc76fc269bc82221aeff42293346a71b3d6783
Author: lash <dev@holbrook.no>
Date:   Sun,  6 Aug 2023 14:03:01 +0100

Enable batch retrieval of receipts

Diffstat:
MCHANGELOG | 4+++-
Mchainsyncer/driver/chain_interface.py | 58++++++++++++++++++++++++++++++++++++++++++++++++++++------
Mchainsyncer/unittest/base.py | 4++++
Mrequirements.txt | 2+-
Msetup.cfg | 2+-
Dtest_requirements.txt | 8--------
6 files changed, 61 insertions(+), 17 deletions(-)

diff --git a/CHANGELOG b/CHANGELOG @@ -1,5 +1,7 @@ +* 0.8.0 + - Enable batch rpc retrievals of receipts * 0.7.1 - * Change license to AGPL3 and copyright waived to public domain + - Change license to AGPL3 and copyright waived to public domain * 0.7.0 - Upgrade dependencies * 0.6.0 diff --git a/chainsyncer/driver/chain_interface.py b/chainsyncer/driver/chain_interface.py @@ -1,3 +1,6 @@ +# standard imports +import logging + # external imports from chainlib.error import RPCException @@ -5,6 +8,8 @@ from chainlib.error import RPCException from chainsyncer.error import NoBlockForYou from chainsyncer.driver import SyncDriver +logg = logging.getLogger(__name__) + class ChainInterfaceDriver(SyncDriver): @@ -35,7 +40,43 @@ class ChainInterfaceDriver(SyncDriver): return b + def merge_rcpts_single(self, conn, txs): + i = 0 + c = len(txs) + for j in range(c): + hsh = txs[j].hash + o = self.chain_interface.tx_receipt(hsh) + r = conn.do(o) + txs[j].apply_receipt(r) + logg.debug('get receipt {}/{}: {}'.format(j+1, c, hsh)) + i += 1 + return i + + + def merge_rcpts(self, conn, txs): + if self.chain_interface.batch_limit == 1: + return self.merge_rcpts_single(conn, txs) + + rcpts = [] + c = 0 + for tx in txs: + rcpts.append(self.chain_interface.tx_receipt(tx.hash)) + c += 1 + if c == self.chain_interface.batch_limit: + break + + rcpts_r = conn.do(rcpts) + i = 0 + for j in range(len(rcpts)): + rcpt = rcpts_r[j] + if rcpt != None: + txs[j].apply_receipt(self.chain_interface.src_normalize(rcpt)) + i += 1 + return i + + def process(self, conn, item, block): + txs = [] i = item.tx_cursor while True: # handle block objects regardless of whether the tx data is embedded or not @@ -45,11 +86,16 @@ class ChainInterfaceDriver(SyncDriver): tx_hash = block.txs[i] o = self.chain_interface.tx_by_hash(tx_hash, block=block) r = conn.do(o) + except IndexError: + break + txs.append(tx) + i += 1 + + j = len(txs) + i = 0 + while i < j: + i += self.merge_rcpts(conn, txs[i:]) - rcpt = conn.do(self.chain_interface.tx_receipt(tx.hash)) - if rcpt != None: - tx.apply_receipt(self.chain_interface.src_normalize(rcpt)) - + for tx in txs: self.process_single(conn, block, tx) - - i += 1 + raise IndexError() diff --git a/chainsyncer/unittest/base.py b/chainsyncer/unittest/base.py @@ -221,6 +221,10 @@ class MockDriver(SyncDriver): class MockChainInterface: + def __init__(self, batch_limit=1): + self.batch_limit = batch_limit + + def block_by_number(self, number): return ('block_by_number', number,) diff --git a/requirements.txt b/requirements.txt @@ -1,5 +1,5 @@ confini~=0.6.1 semver==2.13.0 hexathon~=0.1.7 -chainlib~=0.4.0 +chainlib~=0.5.0 shep~=0.3.0 diff --git a/setup.cfg b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainsyncer -version = 0.7.2 +version = 0.8.0 description = Generic blockchain syncer driver author = Louis Holbrook author_email = dev@holbrook.no diff --git a/test_requirements.txt b/test_requirements.txt @@ -1,8 +0,0 @@ -chainlib-eth~=0.0.9a14 -psycopg2==2.8.6 -SQLAlchemy==1.3.20 -alembic==1.4.2 -eth_tester==0.5.0b3 -py-evm==0.3.0a20 -rlp==2.0.1 -pytest==6.0.1