chainqueue

Blockchain transaction queue control
Log | Files | Refs | LICENSE

commit ca4f7c9b8ec6c951cdadb6ea4bc8b63ff3467ca7
parent 3e487d07028fbccc55ec0d30a30b7d1b5387178a
Author: nolash <dev@holbrook.no>
Date:   Tue,  1 Jun 2021 13:53:44 +0200

Rename fs queue controller module

Diffstat:
Dchainqueue/fs/cache.py | 136-------------------------------------------------------------------------------
Achainqueue/fs/queue.py | 139+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtests/test_fs.py | 2+-
3 files changed, 140 insertions(+), 137 deletions(-)

diff --git a/chainqueue/fs/cache.py b/chainqueue/fs/cache.py @@ -1,136 +0,0 @@ -# standard imports -import stat -import logging -import os -import stat - -# local imports -from chainqueue.enum import ( - StatusBits, - status_bytes, - ) - -logg = logging.getLogger(__name__) - - -class FsQueueBackend: - - - def add(self, label, content, prefix): - raise NotImplementedError() - - - def get(self, idx): - raise NotImplementedError() - - - def set_prefix(self, idx, prefix): - raise NotImplementedError() - - -class FsQueue: - - def __init__(self, root_path, backend=FsQueueBackend()): - self.backend = backend - self.path = root_path - self.path_state = {} - - try: - fi = os.stat(self.path) - self.__verify_directory() - except FileNotFoundError: - FsQueue.__prepare_directory(self.path) - - for r in FsQueue.__state_dirs(self.path): - self.path_state[r[0]] = r[1] - - self.index_path = os.path.join(self.path, '.active') - os.makedirs(self.index_path, exist_ok=True) - - - def add(self, key, value): - prefix = status_bytes() - c = self.backend.add(key, value, prefix=prefix) - - key_hex = key.hex() - entry_path = os.path.join(self.index_path, key_hex) - f = open(entry_path, 'xb') - f.write(c.to_bytes(8, byteorder='big')) - f.close() - - ptr_path = os.path.join(self.path_state['new'], key_hex) - os.symlink(entry_path, ptr_path) - - logg.debug('added new queue entry {} -> {} index {}'.format(ptr_path, entry_path, c)) - - - def __get_backend_idx(self, key): - entry_path = os.path.join(self.index_path, key.hex()) - f = open(entry_path, 'rb') - b = f.read() - f.close() - return int.from_bytes(b, byteorder='big') - - - - def get(self, key): - idx = self.__get_backend_idx(key) - return self.backend.get(idx) - - - def move(self, key, to_state, from_state=None): - key_hex = key.hex() - cur_path = os.path.join(self.path_state[from_state], key_hex) - fi = os.lstat(cur_path) - if not stat.S_ISLNK(fi.st_mode): - logg.error('no such entry {}'.format(cur_path)) - raise FileNotFoundError(key_hex) - new_path = os.path.join(self.path_state[to_state], key_hex) - os.rename(cur_path, new_path) - - - def purge(self, key, queuestate): - key_hex = key.hex() - cur_path = os.path.join(self.path_state[queuestate], key_hex) - active_path = os.path.join(self.index_path, key_hex) - try: - fi = os.stat(cur_path) - os.unlink(active_path) - except FileNotFoundError: - os.unlink(cur_path) - - logg.debug('purge queue entry {}'.format(key_hex)) - - def set(self, key, status): - idx = self.__get_backend_idx(key) - - prefix = status_bytes(status) - self.backend.set_prefix(idx, prefix) - - logg.debug('set queue state {} to {}'.format(key.hex(), status)) - - - @staticmethod - def __state_dirs(path): - r = [] - for s in [ - 'new', - 'reserved', - 'ready', - 'error', - 'defer', - ]: - r.append((s, os.path.join(path, 'spool', s))) - return r - - - def __verify_directory(self): - return True - - - @staticmethod - def __prepare_directory(path): - os.makedirs(path, exist_ok=True) - os.makedirs(os.path.join(path, '.cache')) - for r in FsQueue.__state_dirs(path): - os.makedirs(r[1]) diff --git a/chainqueue/fs/queue.py b/chainqueue/fs/queue.py @@ -0,0 +1,139 @@ +# standard imports +import stat +import logging +import os +import stat + +# local imports +from chainqueue.enum import ( + StatusBits, + status_bytes, + ) + +logg = logging.getLogger(__name__) + + +class FsQueueBackend: + + + def add(self, label, content, prefix): + raise NotImplementedError() + + + def get(self, idx): + raise NotImplementedError() + + + def set_prefix(self, idx, prefix): + raise NotImplementedError() + + +class FsQueue: + + def __init__(self, root_path, backend=FsQueueBackend()): + self.backend = backend + self.path = root_path + self.path_state = {} + + try: + fi = os.stat(self.path) + self.__verify_directory() + except FileNotFoundError: + FsQueue.__prepare_directory(self.path) + + for r in FsQueue.__state_dirs(self.path): + self.path_state[r[0]] = r[1] + + self.index_path = os.path.join(self.path, '.active') + self.nonce_path = os.path.join(self.path, '.accounts') + + os.makedirs(self.index_path, exist_ok=True) + + + def add(self, key, value): + prefix = status_bytes() + c = self.backend.add(key, value, prefix=prefix) + + key_hex = key.hex() + entry_path = os.path.join(self.index_path, key_hex) + f = open(entry_path, 'xb') + f.write(c.to_bytes(8, byteorder='big')) + f.close() + + ptr_path = os.path.join(self.path_state['new'], key_hex) + os.symlink(entry_path, ptr_path) + + logg.debug('added new queue entry {} -> {} index {}'.format(ptr_path, entry_path, c)) + + + def __get_backend_idx(self, key): + entry_path = os.path.join(self.index_path, key.hex()) + f = open(entry_path, 'rb') + b = f.read() + f.close() + return int.from_bytes(b, byteorder='big') + + + + def get(self, key): + idx = self.__get_backend_idx(key) + return self.backend.get(idx) + + + def move(self, key, to_state, from_state=None): + key_hex = key.hex() + cur_path = os.path.join(self.path_state[from_state], key_hex) + fi = os.lstat(cur_path) + if not stat.S_ISLNK(fi.st_mode): + logg.error('no such entry {}'.format(cur_path)) + raise FileNotFoundError(key_hex) + new_path = os.path.join(self.path_state[to_state], key_hex) + os.rename(cur_path, new_path) + + + def purge(self, key, queuestate): + key_hex = key.hex() + cur_path = os.path.join(self.path_state[queuestate], key_hex) + active_path = os.path.join(self.index_path, key_hex) + try: + fi = os.stat(cur_path) + os.unlink(active_path) + except FileNotFoundError: + os.unlink(cur_path) + + logg.debug('purge queue entry {}'.format(key_hex)) + + + def set(self, key, status): + idx = self.__get_backend_idx(key) + + prefix = status_bytes(status) + self.backend.set_prefix(idx, prefix) + + logg.debug('set queue state {} to {}'.format(key.hex(), status)) + + + @staticmethod + def __state_dirs(path): + r = [] + for s in [ + 'new', + 'reserved', + 'ready', + 'error', + 'defer', + ]: + r.append((s, os.path.join(path, 'spool', s))) + return r + + + def __verify_directory(self): + return True + + + @staticmethod + def __prepare_directory(path): + os.makedirs(path, exist_ok=True) + os.makedirs(os.path.join(path, '.cache')) + for r in FsQueue.__state_dirs(path): + os.makedirs(r[1]) diff --git a/tests/test_fs.py b/tests/test_fs.py @@ -6,7 +6,7 @@ import logging import os # local imports -from chainqueue.fs.cache import FsQueue +from chainqueue.fs.queue import FsQueue from chainqueue.fs.dir import HexDir from chainqueue.enum import StatusBits