piknik

Unnamed repository; edit this file 'description' to name the repository.
Info | Log | Files | Refs | README | LICENSE

commit a3c55af9838670e9c7b0b42b0e6d3774db0b08e2
parent 4c8ced04e6bacd5f6ae2ac8a8adda9c51544432c
Author: lash <dev@holbrook.no>
Date:   Wed,  9 Nov 2022 19:08:11 +0000

Add envelope signature verify to library

Diffstat:
Mpiknik/crypto.py | 60++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Mtests/test_crypto.py | 67++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------
2 files changed, 112 insertions(+), 15 deletions(-)

diff --git a/piknik/crypto.py b/piknik/crypto.py @@ -1,9 +1,25 @@ # standard imports +import os +import logging +import tempfile from email.message import Message # external imports import gnupg +logging.basicConfig(level=logging.DEBUG) +logg = logging.getLogger() +logging.getLogger('gnupg').setLevel(logging.ERROR) + + + +class CorruptEnvelope(Exception): + pass + + +class InvalidSignature(Exception): + pass + class PGPSigner: @@ -32,5 +48,45 @@ class PGPSigner: return m - def verify(self, msg, sig): # msg = IssueMessage object - pass + def __verify_msg(self, m, ms): + v = m.as_string() + sig = ms.get_payload() + (fd, fp) = tempfile.mkstemp() + f = os.fdopen(fd, 'w') + f.write(sig) + f.close() + r = self.gpg.verify_data(fp, v.encode('utf-8')) + os.unlink(fp) + if r.key_status != None: + raise InvalidSignature('key status {}'.format(r.key_status)) + if r.status != 'signature valid': + raise InvalidSignature('invalid signature') + logg.debug('signature ok') + + + def verify(self, msg): # msg = IssueMessage object + in_envelope = False + message_ids = [] + envelope_message = None + message_id = None + for m in msg.walk(): + if m.get('X-Piknik-Envelope') == 'pgp': + logg.debug('detected pgp envelope') + in_envelope = True + elif in_envelope: + if envelope_message != None: + if m.get('X-Piknik-Envelope') != None: + raise CorruptEnvelope() + if m.get('Content-Type') == 'application/pgp-signature': + self.__verify_msg(envelope_message, m) + logg.debug('pgp signature for message id "{}" ok'.format(message_id)) + message_ids.append(message_id) + + in_envelope = False + envelope_message = None + message_id = None + else: + message_id = m.get('X-Piknik-Msg-Id') + logg.debug('checking envelope for message id "{}"'.format(message_id)) + envelope_message = m + return message_ids diff --git a/tests/test_crypto.py b/tests/test_crypto.py @@ -56,21 +56,55 @@ class TestMsg(unittest.TestCase): two.set_payload('bar') m.attach(two) - v = m.as_string() m = self.crypto.sign(m, passphrase='foo') + self.crypto.verify(m) - for p in m.walk(): - if p.get_content_type() == 'application/pgp-signature': - sig = p.get_payload() - (fd, fp) = tempfile.mkstemp() - f = os.fdopen(fd, 'w') - f.write(sig) - f.close() - r = self.gpg.verify_data(fp, v.encode('utf-8')) - os.unlink(fp) - self.assertIsNone(r.key_status) - self.assertEqual(r.status, 'signature valid') - break + + def test_wrap_double_sig(self): + mp = Message() + mp.set_type('multipart/related') + mp.set_payload(None) + + m = Message() + m.add_header('X-Piknik-Msg-Id', 'foo') + m.set_type('multipart/mixed') + m.set_payload(None) + + one = Message() + one.set_charset('utf-8') + one.set_payload('inky') + m.attach(one) + + two = Message() + two.set_charset('utf-8') + two.set_payload('pinky') + m.attach(two) + + m = self.crypto.sign(m, passphrase='foo') + mp.attach(m) + + m = Message() + m.add_header('X-Piknik-Msg-Id', 'bar') + m.set_type('multipart/mixed') + m.set_payload(None) + + one = Message() + one.set_charset('utf-8') + one.set_payload('blinky') + m.attach(one) + + two = Message() + two.set_charset('utf-8') + two.set_payload('clyde') + m.attach(two) + + m = self.crypto.sign(m, passphrase='foo') + mp.attach(m) + + r = self.crypto.verify(mp) + self.assertEqual(len(r), 2) + self.assertIn('foo', r) + self.assertIn('bar', r) # TODO: assert @@ -81,5 +115,12 @@ class TestMsg(unittest.TestCase): print(r) + def test_wrap_basket_sig(self): + o = Issue('foo') + v = self.b.add(o) + r = self.b.msg(v, 's:foo', 's:bar') + + + if __name__ == '__main__': unittest.main()