chainqueue

Blockchain transaction queue control
Log | Files | Refs | LICENSE

commit b0eb0c474d39fd35288b41850ee33461a34fe914
parent eca9dd95d669e0c34ee683afe2a2905ef6263c55
Author: nolash <dev@holbrook.no>
Date:   Wed,  2 Jun 2021 13:09:21 +0200

WIP server sql backend adapter handle duplicates

Diffstat:
Mchainqueue/adapters/eth.py | 3++-
Mchainqueue/error.py | 1-
Mchainqueue/runnable/server.py | 3++-
Mchainqueue/sql/backend.py | 22++++++++++++++++++++--
4 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/chainqueue/adapters/eth.py b/chainqueue/adapters/eth.py @@ -16,5 +16,6 @@ class EthAdapter: def add(self, chain_spec, bytecode): tx = unpack(bytecode, chain_spec) session = self.backend.create_session() - self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) + r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session) session.close() + return r diff --git a/chainqueue/error.py b/chainqueue/error.py @@ -27,4 +27,3 @@ class BackendIntegrityError(ChainQueueException): """ pass - diff --git a/chainqueue/runnable/server.py b/chainqueue/runnable/server.py @@ -124,6 +124,7 @@ if __name__ == '__main__': data_in = srvs.recv(1024) data_in_str = data_in.decode('utf-8') data = bytes.fromhex(strip_0x(data_in_str)) - adapter.add(chain_spec, data) + r = adapter.add(chain_spec, data) + srvs.send(r.to_bytes(4, byteorder='big')) ctrl.shutdown(None, None) diff --git a/chainqueue/sql/backend.py b/chainqueue/sql/backend.py @@ -1,13 +1,31 @@ +# standard imports +import logging + +# external imports +from sqlalchemy.exc import ( + IntegrityError, + ) + # local imports -from chainqueue.sql.tx import create +from chainqueue.sql.tx import create as queue_create from chainqueue.db.models.base import SessionBase +logg = logging.getLogger(__name__) + class SQLBackend: def __init__(self, conn_spec, *args, **kwargs): SessionBase.connect(conn_spec, pool_size=kwargs.get('poolsize', 0), debug=kwargs.get('debug', False)) - self.create = create + + + def create(self, chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_predecessors=True, session=None): + try: + queue_create(chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_predecessors=True, session=None) + except IntegrityError as e: + logg.warning('skipped possible duplicate insert {}'.format(e)) + return 1 + return 0 def create_session(self):