commit 92ab8a3cef41702a04343e06e4f7e162564b8131
parent a3c55af9838670e9c7b0b42b0e6d3774db0b08e2
Author: lash <dev@holbrook.no>
Date: Wed, 9 Nov 2022 19:26:23 +0000
Add resume store signature verify test
Diffstat:
5 files changed, 39 insertions(+), 17 deletions(-)
diff --git a/piknik/basket.py b/piknik/basket.py
@@ -14,7 +14,7 @@ logg = logging.getLogger(__name__)
class Basket:
- def __init__(self, state_factory, message_wrapper=None):
+ def __init__(self, state_factory, message_wrapper=None, message_verifier=None):
self.no_resurrect = True
self.state = state_factory.create_states(default_state='proposed', verifier=self.__check_resurrect)
self.state.add('backlog')
@@ -33,6 +33,7 @@ class Basket:
self.__msg = state_factory.create_messages()
self.__msg_wrap = message_wrapper
+ self.__msg_verify = message_verifier
self.issues_rev = {}
@@ -147,13 +148,17 @@ class Basket:
o = Issue.from_str(r)
try:
v = self.__msg.get(issue_id)
- return IssueMessage.parse(o, v.decode('utf-8'))
+ return IssueMessage.parse(o, v.decode('utf-8'), verifier=self.__msg_verify)
except FileNotFoundError:
logg.debug('instantiating new message log for {}'.format(issue_id))
return IssueMessage(o)
+ def get_msg(self, issue_id):
+ return self.__get_msg(issue_id)
+
+
def msg(self, issue_id, *args):
m = self.__get_msg(issue_id)
m.add(*args, wrapper=self.__msg_wrap)
diff --git a/piknik/msg.py b/piknik/msg.py
@@ -29,9 +29,12 @@ class IssueMessage:
@classmethod
- def parse(cls, issue, v):
+ def parse(cls, issue, v, verifier=None):
o = cls(issue)
- o.__m = message_from_string(v)
+ m = message_from_string(v)
+ if verifier != None:
+ verifier(m)
+ o.__m
return o
diff --git a/tests/common.py b/tests/common.py
@@ -1,8 +1,10 @@
# standard imports
import logging
+import tempfile
# external imports
import shep
+import gnupg
logg = logging.getLogger()
@@ -39,3 +41,13 @@ class TestStates:
def create_messages(self, *args):
return TestMsgStore()
+
+
+def pgp_setup():
+ from piknik.crypto import PGPSigner
+ gpg_dir = tempfile.mkdtemp()
+ gpg = gnupg.GPG(gnupghome=gpg_dir)
+ gpg_input = gpg.gen_key_input(key_type='RSA', key_length=1024, passphrase='foo')
+ gpg_key = gpg.gen_key(gpg_input)
+ crypto = PGPSigner(gpg_dir, default_key=gpg_key.fingerprint, passphrase='foo')
+ return (crypto, gpg, gpg_dir,)
diff --git a/tests/test_crypto.py b/tests/test_crypto.py
@@ -3,21 +3,17 @@ import os
import unittest
import logging
import json
-import tempfile
import shutil
from email.message import Message
-# external imports
-import gnupg
-
# local imports
from piknik import Basket
from piknik import Issue
-from piknik.crypto import PGPSigner
# test imports
from tests.common import TestStates
from tests.common import TestMsgStore
+from tests.common import pgp_setup
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
@@ -29,12 +25,8 @@ class TestMsg(unittest.TestCase):
def setUp(self):
self.store = TestStates()
- self.gpg_dir = tempfile.mkdtemp()
- self.gpg = gnupg.GPG(gnupghome=self.gpg_dir)
- gpg_input = self.gpg.gen_key_input(key_type='RSA', key_length=1024, passphrase='foo')
- gpg_key = self.gpg.gen_key(gpg_input)
- self.crypto = PGPSigner(self.gpg_dir, default_key=gpg_key.fingerprint, passphrase='foo')
- self.b = Basket(self.store, message_wrapper=self.crypto.sign)
+ (self.crypto, self.gpg, self.gpg_dir) = pgp_setup()
+ self.b = Basket(self.store, message_wrapper=self.crypto.sign, message_verifier=self.crypto.verify)
def tearDown(self):
@@ -121,6 +113,5 @@ class TestMsg(unittest.TestCase):
r = self.b.msg(v, 's:foo', 's:bar')
-
if __name__ == '__main__':
unittest.main()
diff --git a/tests/test_store.py b/tests/test_store.py
@@ -3,7 +3,6 @@ import unittest
import logging
import tempfile
import shutil
-
# external imports
import shep
@@ -18,6 +17,7 @@ from piknik.store import FileStoreFactory
# tests imports
from tests.common import debug_out
from tests.common import TestStates
+from tests.common import pgp_setup
logging.basicConfig(level=logging.DEBUG)
@@ -98,5 +98,16 @@ class TestStore(unittest.TestCase):
print(m)
+ def test_msg_resume_sig_verify(self):
+ (crypto, gpg, gpg_dir) = pgp_setup()
+ b = Basket(self.store_factory, message_wrapper=crypto.sign, message_verifier=crypto.verify)
+ o = Issue('foo')
+ v = b.add(o)
+ r = b.msg(v, 's:foo', 's:bar')
+
+ b = Basket(self.store_factory, message_wrapper=crypto.sign, message_verifier=crypto.verify)
+ m = b.get_msg(v)
+
+
if __name__ == '__main__':
unittest.main()