commit 99af484f30c14cd75bd43707bd9d125dbd354800
parent b0eb0c474d39fd35288b41850ee33461a34fe914
Author: nolash <dev@holbrook.no>
Date: Wed, 2 Jun 2021 14:07:01 +0200
Add upcoming getter
Diffstat:
5 files changed, 74 insertions(+), 11 deletions(-)
diff --git a/chainqueue/adapters/base.py b/chainqueue/adapters/base.py
@@ -0,0 +1,13 @@
+
+class Adapter:
+
+ def __init__(self, backend):
+ self.backend = backend
+
+
+ def process(self, chain_spec):
+ raise NotImplementedEror()
+
+
+ def add(self, chain_spec, bytecode):
+ raise NotImplementedEror()
diff --git a/chainqueue/adapters/eth.py b/chainqueue/adapters/eth.py
@@ -1,21 +1,49 @@
# external imports
+from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.tx import (
unpack,
)
-from hexathon import add_0x
+from hexathon import (
+ add_0x,
+ strip_0x,
+ )
# local imports
+from chainqueue.adapters.base import Adapter
+from chainqueue.enum import StatusBits
+
+class EthAdapter(Adapter):
-class EthAdapter:
- def __init__(self, backend):
- self.backend = backend
+ def translate(self, chain_spec, bytecode):
+ tx = unpack(bytecode, chain_spec)
+ tx['source_token'] = ZERO_ADDRESS
+ tx['destination_token'] = ZERO_ADDRESS
+ tx['from_value'] = tx['value']
+ tx['to_value'] = tx['value']
+ return tx
def add(self, chain_spec, bytecode):
- tx = unpack(bytecode, chain_spec)
+ tx = self.translate(chain_spec, bytecode)
session = self.backend.create_session()
r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session)
+ if r:
+ session.rollback()
+ session.close()
+ return r
+ r = self.backend.cache(tx, session=session)
+ session.commit()
session.close()
return r
+
+
+ def cache(self, chain_spec):
+ session = self.backend.create_session()
+ r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session)
+ session.close()
+
+
+ def process(self, chain_spec):
+ return self.backend.get(chain_spec, StatusBits.QUEUED, unpack)
diff --git a/chainqueue/runnable/server.py b/chainqueue/runnable/server.py
@@ -71,7 +71,7 @@ class SessionController:
self.srv = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
self.srv.bind(config.get('SESSION_SOCKET_PATH'))
self.srv.listen(2)
- self.srv.settimeout(5.0)
+ self.srv.settimeout(1.0)
def shutdown(self, signo, frame):
if self.dead:
@@ -99,7 +99,7 @@ signal.signal(signal.SIGINT, ctrl.shutdown)
signal.signal(signal.SIGTERM, ctrl.shutdown)
dsn = dsn_from_config(config)
-backend = SQLBackend(dsn)
+backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG'))
adapter = EthAdapter(backend)
chain_spec = ChainSpec.from_chain_str('evm:mainnet:1')
@@ -118,7 +118,9 @@ if __name__ == '__main__':
logg.error('entity on socket path is not a socket')
break
if srvs == None:
- logg.debug('ping')
+ txs = adapter.process(chain_spec)
+ for k in txs.keys():
+ logg.debug('txs {} {}'.format(k, txs[k]))
continue
srvs.setblocking(False)
data_in = srvs.recv(1024)
diff --git a/chainqueue/sql/backend.py b/chainqueue/sql/backend.py
@@ -9,6 +9,9 @@ from sqlalchemy.exc import (
# local imports
from chainqueue.sql.tx import create as queue_create
from chainqueue.db.models.base import SessionBase
+from chainqueue.db.models.tx import TxCache
+from chainqueue.sql.query import get_upcoming_tx
+from chainqueue.sql.state import set_ready
logg = logging.getLogger(__name__)
@@ -21,12 +24,26 @@ class SQLBackend:
def create(self, chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_predecessors=True, session=None):
try:
- queue_create(chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_predecessors=True, session=None)
+ queue_create(chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_predecessors=True, session=session)
except IntegrityError as e:
logg.warning('skipped possible duplicate insert {}'.format(e))
return 1
+ set_ready(chain_spec, tx_hash, session=session)
return 0
+ def cache(self, tx, session=None):
+ txc = TxCache(tx['hash'], tx['from'], tx['to'], tx['source_token'], tx['destination_token'], tx['from_value'], tx['to_value'], session=session)
+ session.add(txc)
+ session.flush()
+ logg.debug('cache {}'.format(txc))
+ return 0
+
+
+ def get(self, chain_spec, typ, decoder):
+ txs = get_upcoming_tx(chain_spec, typ, decoder=decoder)
+ return txs
+
+
def create_session(self):
return SessionBase.create_session()
diff --git a/chainqueue/sql/query.py b/chainqueue/sql/query.py
@@ -8,7 +8,10 @@ from sqlalchemy import or_
from sqlalchemy import not_
from sqlalchemy import tuple_
from sqlalchemy import func
-from hexathon import add_0x
+from hexathon import (
+ add_0x,
+ strip_0x,
+ )
# local imports
from chainqueue.db.models.otx import Otx
@@ -274,7 +277,7 @@ def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, re
if o == None:
continue
- tx_signed_bytes = bytes.fromhex(o.signed_tx[2:])
+ tx_signed_bytes = bytes.fromhex(strip_0x(o.signed_tx))
tx = decoder(tx_signed_bytes, chain_spec)
txs[o.tx_hash] = o.signed_tx