test_render.py (3547B)
1 # standard imports 2 import unittest 3 import logging 4 import json 5 import shutil 6 import io 7 import tempfile 8 from email.message import Message 9 from email.utils import localtime as email_localtime 10 11 # external imports 12 from mimeparse import parse_mime_type 13 14 # local imports 15 from piknik import ( 16 Basket, 17 Issue, 18 ) 19 from piknik.msg import IssueMessage 20 from piknik.render.base import Renderer 21 from piknik.wrap import Wrapper 22 23 # test imports 24 from tests.common import TestStates 25 from tests.common import TestMsgStore 26 from tests.common import pgp_setup 27 28 logging.basicConfig(level=logging.DEBUG) 29 logg = logging.getLogger() 30 31 32 def test_wrapper(p): 33 m = Message() 34 m.add_header('Foo', 'bar') 35 m.set_type('multipart/relative') 36 m.add_header('X-Piknik-Envelope', 'foo') 37 m.set_payload(p) 38 return m 39 40 41 def test_unwrapper(msg, message_callback=None, part_callback=None): 42 for v in msg.walk(): 43 if message_callback != None: 44 message_callback(v) 45 46 47 class TestRenderer(Renderer): 48 49 def __init__(self, basket, accumulator=None): 50 super(TestRenderer, self).__init__(basket, accumulator=accumulator, wrapper=Wrapper()) 51 self.p = 0 52 self.e = 0 53 54 55 def apply_envelope(self, state, issue, tags, envelope, accumulator=None): 56 r = self.e 57 self.e += 1 58 return r 59 60 61 def apply_message_part(self, state, issue, tags, envelope, message, message_date, message_content, accumulator=None): 62 r = self.p 63 self.p += 1 64 return r 65 66 67 class TestRendererComposite(TestRenderer): 68 69 def __init__(self, basket, accumulator=None): 70 super(TestRendererComposite, self).__init__(basket, accumulator=accumulator) 71 self.last_message_id = None 72 self.m = [] 73 74 75 def apply_message_post(self, state, issue, tags, envelope, message, message_id, message_date, accumulator=None): 76 if self.last_message_id != message_id: 77 self.m.append(message_id) 78 self.last_message_id = message_id 79 80 81 class TestMsg(unittest.TestCase): 82 83 def setUp(self): 84 self.acc = [] 85 self.store = TestStates() 86 self.b = Basket(self.store, message_wrapper=test_wrapper) 87 self.render_dir = tempfile.mkdtemp() 88 89 90 def accumulate(self, v): 91 self.acc.append(v) 92 93 94 def tearDown(self): 95 shutil.rmtree(self.render_dir) 96 97 98 def test_idlepass(self): 99 wrapper = Wrapper() 100 renderer = TestRenderer(self.b, accumulator=self.accumulate) 101 issue_one = Issue('foo') 102 self.b.add(issue_one) 103 104 issue_two = Issue('bar') 105 v = self.b.add(issue_two) 106 107 m = self.b.msg(v, 's:foo') 108 109 renderer.apply() 110 self.assertEqual(len(self.acc), 2) 111 self.assertEqual(renderer.e, 1) 112 self.assertEqual(renderer.p, 1) 113 114 115 def test_composite(self): 116 renderer = TestRendererComposite(self.b, accumulator=self.accumulate) 117 issue_one = Issue('foo') 118 self.b.add(issue_one) 119 120 issue_two = Issue('bar') 121 v = self.b.add(issue_two) 122 123 m = self.b.msg(v, 's:foo') 124 125 renderer.apply() 126 self.assertEqual(len(renderer.m), 1) 127 128 129 def test_attachment(self): 130 renderer = TestRendererComposite(self.b, accumulator=self.accumulate) 131 issue_one = Issue('foo') 132 self.b.add(issue_one) 133 134 issue_two = Issue('bar') 135 v = self.b.add(issue_two) 136 137 m = self.b.msg(v, 'f:tests/one.png') 138 139 renderer.apply() 140 self.assertEqual(len(renderer.m), 1) 141 142 143 if __name__ == '__main__': 144 unittest.main()