commit 1b82cb31187069e0ea0930de28ab5858a55f1cd2
parent 374a39edc359df29f141c7b41f2dc0abcbd157dc
Author: nolash <dev@holbrook.no>
Date: Fri, 4 Jun 2021 20:48:19 +0200
Factor out dispatch to separate object, implement batch cap per address
Diffstat:
5 files changed, 68 insertions(+), 22 deletions(-)
diff --git a/chaind_eth/dispatch.py b/chaind_eth/dispatch.py
@@ -0,0 +1,49 @@
+# standard imports
+import logging
+
+# external imports
+from chainlib.eth.address import to_checksum_address
+from chainlib.eth.tx import unpack
+from chainqueue.enum import StatusBits
+from chainqueue.sql.query import count_tx
+from hexathon import strip_0x
+
+logg = logging.getLogger(__name__)
+
+
+class Dispatcher:
+
+ status_inflight_mask = StatusBits.IN_NETWORK | StatusBits.FINAL
+
+ def __init__(self, chain_spec, adapter, limit=100):
+ self.address_counts = {}
+ self.chain_spec = chain_spec
+ self.adapter = adapter
+ self.limit = limit
+
+
+ def get_count(self, address, session):
+ c = self.address_counts.get(address)
+ if c == None:
+ c = self.limit - count_tx(self.chain_spec, address, self.status_inflight_mask, StatusBits.IN_NETWORK, session=session)
+ if c < 0:
+ c = 0
+ self.address_counts[address] = c
+ return c
+
+
+ def process(self, rpc, session):
+ c = 0
+ txs = self.adapter.upcoming(self.chain_spec, session=session)
+ for k in txs.keys():
+ signed_tx_bytes = bytes.fromhex(strip_0x(txs[k]))
+ tx_obj = unpack(signed_tx_bytes, self.chain_spec)
+ sender = to_checksum_address(tx_obj['from'])
+ address_count = self.get_count(sender, session)
+ if address_count == 0:
+ logg.debug('too many inflight txs for {}, skipping {}'.format(sender, k))
+ continue
+ logg.debug('txs {} {}'.format(k, txs[k]))
+ self.adapter.dispatch(self.chain_spec, rpc, k, txs[k], session)
+ c += 1
+ return c
diff --git a/chaind_eth/runnable/server.py b/chaind_eth/runnable/server.py
@@ -19,6 +19,7 @@ from chainqueue.sql.backend import SQLBackend
from chainqueue.db import dsn_from_config
# local imports
+from chaind_eth.dispatch import Dispatcher
from chainqueue.adapters.eth import EthAdapter
logging.basicConfig(level=logging.WARNING)
@@ -128,7 +129,6 @@ def main():
logg.debug('getting connection')
(srvs, srvs_addr) = ctrl.get_connection()
except OSError as e:
- havesends = 0
try:
fi = os.stat(config.get('SESSION_SOCKET_PATH'))
except FileNotFoundError:
@@ -139,12 +139,11 @@ def main():
break
if srvs == None:
logg.debug('timeout (remote socket is none)')
- txs = adapter.upcoming(chain_spec)
- for k in txs.keys():
- havesends += 1
- logg.debug('txs {} {}'.format(k, txs[k]))
- adapter.dispatch(chain_spec, rpc, k, txs[k])
- if havesends > 0:
+ dispatcher = Dispatcher(chain_spec, adapter)
+ session = backend.create_session()
+ r = dispatcher.process(rpc, session)
+ session.close()
+ if r > 0:
ctrl.srv.settimeout(0.1)
else:
ctrl.srv.settimeout(4.0)
@@ -167,12 +166,14 @@ def main():
continue
logg.debug('recv {} bytes'.format(len(data)))
- r = adapter.add(chain_spec, data)
+ session = backend.create_session()
+ r = adapter.add(chain_spec, data, session)
try:
r = srvs.send(r.to_bytes(4, byteorder='big'))
logg.debug('{} bytes sent'.format(r))
except BrokenPipeError:
logg.debug('they just hung up. how rude.')
+ session.close()
srvs.close()
ctrl.shutdown(None, None)
diff --git a/chainqueue/adapters/eth.py b/chainqueue/adapters/eth.py
@@ -25,9 +25,8 @@ class EthAdapter(Adapter):
return tx
- def add(self, chain_spec, bytecode):
+ def add(self, chain_spec, bytecode, session):
tx = self.translate(bytecode, chain_spec)
- session = self.backend.create_session()
r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session)
if r:
session.rollback()
@@ -35,23 +34,20 @@ class EthAdapter(Adapter):
return r
r = self.backend.cache(tx, session=session)
session.commit()
- session.close()
return r
- def cache(self, chain_spec):
- session = self.backend.create_session()
- r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session)
- session.close()
+# def cache(self, chain_spec):
+# session = self.backend.create_session()
+# r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session)
+# session.close()
- def upcoming(self, chain_spec):
- return self.backend.get(chain_spec, StatusBits.QUEUED, unpack)
+ def upcoming(self, chain_spec, session):
+ return self.backend.get(chain_spec, StatusBits.QUEUED, unpack) # possible maldesign, up-stack should use our session?
- def dispatch(self, chain_spec, rpc, tx_hash, signed_tx):
+ def dispatch(self, chain_spec, rpc, tx_hash, signed_tx, session):
o = raw(signed_tx)
- session = self.backend.create_session()
r = self.backend.dispatch(chain_spec, rpc, tx_hash, o)
- session.close()
return r
diff --git a/requirements.txt b/requirements.txt
@@ -1,2 +1,2 @@
-chaind<=0.0.1,>=0.0.1a2
+chaind<=0.0.1,>=0.0.1a3
hexathon~=0.0.1a7
diff --git a/setup.cfg b/setup.cfg
@@ -1,6 +1,6 @@
[metadata]
name = chaind-eth
-version = 0.0.1a1
+version = 0.0.1a2
description = Queue server for ethereum
author = Louis Holbrook
author_email = dev@holbrook.no