commit edf8eebe26924c9785b5df4325a0fc61cd641fba
parent 1036ecd79a0b127973ea85f0535fa58538c8d08c
Author: lash <dev@holbrook.no>
Date: Sat, 22 Apr 2023 08:57:23 +0100
Add issue alias for manipulating issue on cli
Diffstat:
8 files changed, 119 insertions(+), 31 deletions(-)
diff --git a/.piknik/.msg/.master b/.piknik/.msg/.master
@@ -1 +1,2 @@
-b]Eg'>9l`qJj瀵
-\ No newline at end of file
+b]Eg'>9l`qJj瀵D\KmHRWGJEB WcH+UJɰܕYP
+L"F)0RdAӷs.oVGKGٹ('
+\ No newline at end of file
diff --git a/piknik/basket.py b/piknik/basket.py
@@ -3,6 +3,7 @@ import logging
# external imports
import shep
+from shep.error import StateItemNotFound
# local imports
from .error import DeadIssue
@@ -34,6 +35,8 @@ class Basket:
self.__msg = state_factory.create_messages()
self.__msg_wrap = message_wrapper
+ self.__alias = state_factory.create_aliases()
+
self.issues_rev = {}
@@ -45,19 +48,28 @@ class Basket:
def add(self, issue):
issue_id = str(issue.id)
- self.state.put(issue_id, contents=str(issue))
+ j = str(issue)
+ self.state.put(issue_id, contents=j)
self.__tags.put(issue_id)
+ if issue.alias != None:
+ self.__alias.put(issue.alias, issue.id)
return issue_id
def get(self, issue_id):
+ logg.debug("basked issue get {}".format(issue_id))
r = self.state.get(issue_id)
+ if r == None:
+ aliased_issue_id = self.__alias.get(issue_id)
+ r = self.state.get(aliased_issue_id)
+ logg.debug("resolved issue {} from alias '{}': {}".format(aliased_issue_id, issue_id, r))
o = Issue.from_str(r)
return o
def get_state(self, issue_id):
- v = self.state.state(issue_id)
+ o = self.get(issue_id)
+ v = self.state.state(o.id)
return self.state.name(v)
@@ -86,7 +98,10 @@ class Basket:
def state_finish(self, issue_id):
- self.state.move(issue_id, self.state.FINISHED)
+ o = self.get(issue_id)
+ self.state.move(o.id, self.state.FINISHED)
+ if o.alias != None:
+ self.__alias.purge(o.alias)
def advance(self, issue_id):
@@ -143,7 +158,8 @@ class Basket:
def tags(self, issue_id):
- v = self.__tags.state(issue_id)
+ o = self.get(issue_id)
+ v = self.__tags.state(o.id) #issue_id)
r = self.__tags.elements(v)
if r == 'UNTAGGED':
r = '(' + r + ')'
@@ -151,14 +167,15 @@ class Basket:
def __get_msg(self, issue_id, envelope_callback=None, message_callback=None, post_callback=None):
- r = self.state.get(issue_id)
- o = Issue.from_str(r)
+ #r = self.state.get(issue_id)
+ o = self.get(issue_id)
+ #o = Issue.from_str(r)
try:
- v = self.__msg.get(issue_id)
+ v = self.__msg.get(o.id)
m = IssueMessage.parse(o, v.decode('utf-8'), envelope_callback=envelope_callback, message_callback=message_callback, post_callback=post_callback)
return m
except FileNotFoundError as e:
- logg.debug('instantiating new message log for {} {}'.format(issue_id, e))
+ logg.debug('instantiating new message log for {} {}'.format(o.id, e))
return IssueMessage(o)
@@ -168,47 +185,54 @@ class Basket:
def dep(self, issue_id, dependency_issue_id):
- r = self.state.get(issue_id)
- self.state.get(dependency_issue_id)
- o = Issue.from_str(r)
+ #r = self.state.get(issue_id)
+ o = self.get(issue_id)
+ #self.state.get(dependency_issue_id)
+ self.get(dependency_issue_id)
+ #o = Issue.from_str(r)
r = o.dep(dependency_issue_id)
- self.state.replace(issue_id, contents=str(o))
+ self.state.replace(o.id, contents=str(o))
return r
def undep(self, issue_id, dependency_issue_id):
- r = self.state.get(issue_id)
- self.state.get(dependency_issue_id)
- o = Issue.from_str(r)
+ #r = self.state.get(issue_id)
+ o = self.get(issue_id)
+ #self.state.get(dependency_issue_id)
+ self.get(dependency_issue_id)
+ #o = Issue.from_str(r)
r = o.undep(dependency_issue_id)
- self.state.replace(issue_id, contents=str(o))
+ self.state.replace(o.id, contents=str(o))
return r
def assign(self, issue_id, identity):
- r = self.state.get(issue_id)
- o = Issue.from_str(r)
+ #r = self.state.get(issue_id)
+ o = self.get(issue_id)
+ #o = Issue.from_str(r)
v = Identity(identity)
r = o.assign(v)
- self.state.replace(issue_id, contents=str(o))
+ self.state.replace(o.id, contents=str(o))
return r
def owner(self, issue_id, identity):
- r = self.state.get(issue_id)
- o = Issue.from_str(r)
+ #r = self.state.get(issue_id)
+ o = self.get(issue_id)
+ #o = Issue.from_str(r)
v = Identity(identity)
r = o.set_owner(v)
- self.state.replace(issue_id, contents=str(o))
+ self.state.replace(o.id, contents=str(o))
return r
def unassign(self, issue_id, identity):
- r = self.state.get(issue_id)
- o = Issue.from_str(r)
+ #r = self.state.get(issue_id)
+ o = self.get(issue_id)
+ #o = Issue.from_str(r)
v = Identity(identity)
r = o.unassign(v)
- self.state.replace(issue_id, contents=str(o))
+ self.state.replace(o.id, contents=str(o))
return r
diff --git a/piknik/cli/__init__.py b/piknik/cli/__init__.py
@@ -11,6 +11,7 @@ class Context:
def __init__(self, arg, assembler, mode=0, gpg_home=os.environ.get('GPGHOME')):
self.issue_id = arg.issue_id
self.files_dir = arg.files_dir
+ self.alias = getattr(arg, 'alias', None)
self.show_finished = getattr(arg, 'show_finished', False)
self.show_states = getattr(arg, 'state', [])
#self.store_factory = FileStoreFactory(arg.d)
diff --git a/piknik/cli/add.py b/piknik/cli/add.py
@@ -8,6 +8,7 @@ ctx = None
def subparser(argp):
+ argp.add_argument('-a', '--alias', dest='alias', type=str, help='alias string to refer to issue to with cli commands')
argp.add_argument('title', type=str, nargs='*', help='issue title')
return argp
@@ -24,6 +25,6 @@ def main():
if title != '':
title += ' '
title += s
- o = Issue(title)
+ o = Issue(title, alias=ctx.alias)
v = ctx.basket.add(o)
sys.stdout.write(v + '\n')
diff --git a/piknik/issue.py b/piknik/issue.py
@@ -11,7 +11,7 @@ from piknik.error import ExistsError
class Issue:
- def __init__(self, title, issue_id=None):
+ def __init__(self, title, issue_id=None, alias=None):
if issue_id == None:
issue_id = str(uuid.uuid4())
self.id = issue_id
@@ -19,6 +19,7 @@ class Issue:
self.assigned = []
self.assigned_time = []
self.dependencies = []
+ self.alias = alias
self.owner_idx = 0
@@ -36,6 +37,7 @@ class Issue:
o.owner_idx = i
for v in r['dependencies']:
o.dep(v)
+ o.alias = r['alias']
return o
@@ -99,14 +101,15 @@ class Issue:
return True
raise UnknownIdentityError(identity)
-
+
def __str__(self):
o = {
'id': str(self.id),
'title': self.title,
'assigned': {},
'dependencies': self.dependencies,
+ 'alias': self.alias,
'owner': None,
}
diff --git a/piknik/store/__init__.py b/piknik/store/__init__.py
@@ -28,7 +28,6 @@ class MsgDir(HexDir):
def get(self, k):
u = uuid.UUID(k)
fp = self.to_filepath(u.hex)
- f = None
f = open(fp, 'rb')
r = f.read()
f.close()
@@ -46,6 +45,41 @@ class MsgDir(HexDir):
return self.add(u.bytes, v)
+class AliasDir:
+
+ def __init__(self, root_path):
+ self.dir = root_path
+ os.makedirs(self.dir, exist_ok=True)
+
+
+ def get(self, k):
+ fp = os.path.join(self.dir, k)
+ f = open(fp, 'rb')
+ r = f.read()
+ f.close()
+ u = uuid.UUID(r.hex())
+ return str(u)
+
+
+ def key_to_string(self, k):
+ u = uuid.UUID(bytes=k)
+ return u.bytes.hex()
+
+
+ def put(self, k, v):
+ u = uuid.UUID(v)
+ fp = os.path.join(self.dir, k)
+ logg.debug('putting {} {}'.format(u.bytes.hex(), fp))
+ f = open(fp, 'wb')
+ f.write(u.bytes)
+ f.close()
+
+
+ def purge(self, k):
+ fp = os.path.join(self.dir, k)
+ os.remove(fp)
+
+
class FileStoreFactory:
def __init__(self, directory=None):
@@ -83,3 +117,8 @@ class FileStoreFactory:
def create_messages(self):
d = os.path.join(self.directory, '.msg')
return MsgDir(d)
+
+
+ def create_aliases(self):
+ d = os.path.join(self.directory, '.alias')
+ return AliasDir(d)
diff --git a/tests/common.py b/tests/common.py
@@ -29,6 +29,10 @@ class TestMsgStore:
raise FileNotFoundError(k)
+ def purge(self, k):
+ del self.store[k]
+
+
class TestStates:
def create_states(self, *args, **kwargs):
@@ -43,6 +47,10 @@ class TestStates:
return TestMsgStore()
+ def create_aliases(self, *args):
+ return TestMsgStore()
+
+
def pgp_setup():
from piknik.crypto import PGPSigner
gpg_dir = tempfile.mkdtemp()
diff --git a/tests/test_basic.py b/tests/test_basic.py
@@ -116,5 +116,16 @@ class TestBasic(unittest.TestCase):
self.assertEqual(len(r), 7)
+ def test_alias(self):
+ o = Issue('The first issue', alias="first")
+ issue_id = o.id
+ v = self.b.add(o)
+ o = self.b.get("first")
+ self.assertEqual(o.id, issue_id)
+ self.b.state_finish(issue_id)
+ with self.assertRaises(FileNotFoundError):
+ o = self.b.get("first")
+
+
if __name__ == '__main__':
unittest.main()