summaryrefslogtreecommitdiff
path: root/src/mailman/runners/tests/test_digest.py
blob: 6157f500b6c6f50565d77f5174a1ac85f5d51478 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# Copyright (C) 2014-2016 by the Free Software Foundation, Inc.
#
# This file is part of GNU Mailman.
#
# GNU Mailman is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# GNU Mailman.  If not, see <http://www.gnu.org/licenses/>.

"""Test the digest runner."""

import os
import unittest

from email.iterators import _structure as structure
from email.mime.text import MIMEText
from io import StringIO
from mailman.app.lifecycle import create_list
from mailman.config import config
from mailman.email.message import Message
from mailman.interfaces.member import DeliveryMode
from mailman.interfaces.template import ITemplateManager
from mailman.runners.digest import DigestRunner
from mailman.testing.helpers import (
    LogFileMark, digest_mbox, get_queue_messages, make_digest_messages,
    make_testable_runner, message_from_string,
    specialized_message_from_string as mfs,
    subscribe)
from mailman.testing.layers import ConfigLayer
from string import Template
from tempfile import TemporaryDirectory
from zope.component import getUtility


class TestDigest(unittest.TestCase):
    """Test the digest runner."""

    layer = ConfigLayer
    maxDiff = None

    def setUp(self):
        self._mlist = create_list('test@example.com')
        self._mlist.send_welcome_message = False
        self._mlist.digest_size_threshold = 1
        self._digestq = config.switchboards['digest']
        self._shuntq = config.switchboards['shunt']
        self._virginq = config.switchboards['virgin']
        self._runner = make_testable_runner(DigestRunner, 'digest')
        self._process = config.handlers['to-digest'].process

    def _check_virgin_queue(self):
        # There should be two messages in the virgin queue: the digest as
        # plain-text and as multipart.
        items = get_queue_messages('virgin', expected_count=2)
        self.assertEqual(
            sorted(item.msg.get_content_type() for item in items),
            ['multipart/mixed', 'text/plain'])
        for item in items:
            self.assertEqual(item.msg['subject'],
                             'Test Digest, Vol 1, Issue 1')

    def test_simple_message(self):
        # Subscribe some users receiving digests.
        anne = subscribe(self._mlist, 'Anne')
        anne.preferences.delivery_mode = DeliveryMode.mime_digests
        bart = subscribe(self._mlist, 'Bart')
        bart.preferences.delivery_mode = DeliveryMode.plaintext_digests
        make_digest_messages(self._mlist)
        self._check_virgin_queue()

    def test_non_ascii_message(self):
        # Subscribe some users receiving digests.
        anne = subscribe(self._mlist, 'Anne')
        anne.preferences.delivery_mode = DeliveryMode.mime_digests
        bart = subscribe(self._mlist, 'Bart')
        bart.preferences.delivery_mode = DeliveryMode.plaintext_digests
        msg = Message()
        msg['From'] = 'anne@example.org'
        msg['To'] = 'test@example.com'
        msg['Content-Type'] = 'multipart/mixed'
        msg.attach(MIMEText('message with non-ascii chars: \xc3\xa9',
                            'plain', 'utf-8'))
        mbox = digest_mbox(self._mlist)
        mbox.add(msg.as_string())
        # Use any error logs as the error message if the test fails.
        error_log = LogFileMark('mailman.error')
        make_digest_messages(self._mlist, msg)
        # The runner will send the file to the shunt queue on exception.
        self.assertEqual(len(self._shuntq.files), 0, error_log.read())
        self._check_virgin_queue()

    def test_mime_digest_format(self):
        # Make sure that the format of the MIME digest is as expected.
        self._mlist.digest_size_threshold = 0.6
        self._mlist.volume = 1
        self._mlist.next_digest_number = 1
        self._mlist.send_welcome_message = False
        # Subscribe some users receiving digests.
        anne = subscribe(self._mlist, 'Anne')
        anne.preferences.delivery_mode = DeliveryMode.mime_digests
        bart = subscribe(self._mlist, 'Bart')
        bart.preferences.delivery_mode = DeliveryMode.plaintext_digests
        # Fill the digest.
        process = config.handlers['to-digest'].process
        size = 0
        for i in range(1, 5):
            text = Template("""\
From: aperson@example.com
To: xtest@example.com
Subject: Test message $i
List-Post: <test@example.com>

Here is message $i
""").substitute(i=i)
            msg = message_from_string(text)
            process(self._mlist, msg, {})
            size += len(text)
            if size >= self._mlist.digest_size_threshold * 1024:
                break
        # Run the digest runner to create the MIME and RFC 1153 digests.
        runner = make_testable_runner(DigestRunner)
        runner.run()
        items = get_queue_messages('virgin', expected_count=2)
        # Find the MIME one.
        mime_digest = None
        for item in items:
            if item.msg.is_multipart():
                assert mime_digest is None, 'We got two MIME digests'
                mime_digest = item.msg
        fp = StringIO()
        # Verify the structure is what we expect.
        structure(mime_digest, fp)
        self.assertMultiLineEqual(fp.getvalue(), """\
multipart/mixed
    text/plain
    text/plain
    multipart/digest
        message/rfc822
            text/plain
        message/rfc822
            text/plain
        message/rfc822
            text/plain
        message/rfc822
            text/plain
    text/plain
""")

    def test_issue141(self):
        # Currently DigestMode.summary_digests are equivalent to mime_digests.
        # This also tests GL issue 234.
        self._mlist.send_welcome_message = False
        bart = subscribe(self._mlist, 'Bart')
        bart.preferences.delivery_mode = DeliveryMode.summary_digests
        make_digest_messages(self._mlist)
        # There should be one message in the outgoing queue, destined for
        # Bart, formatted as a MIME digest.
        items = get_queue_messages('virgin', expected_count=1)
        # Bart is the only recipient.
        self.assertEqual(items[0].msgdata['recipients'],
                         set(['bperson@example.com']))
        # The message is a MIME digest, with the structure we expect.
        fp = StringIO()
        structure(items[0].msg, fp)
        self.assertMultiLineEqual(fp.getvalue(), """\
multipart/mixed
    text/plain
    text/plain
    multipart/digest
        message/rfc822
            text/plain
    text/plain
""")

    def test_issue141_one_last_digest(self):
        # Currently DigestMode.summary_digests are equivalent to mime_digests.
        # Also tests issue 234.
        self._mlist.send_welcome_message = False
        bart = subscribe(self._mlist, 'Bart')
        self._mlist.send_one_last_digest_to(
            bart.address, DeliveryMode.summary_digests)
        make_digest_messages(self._mlist)
        # There should be one message in the outgoing queue, destined for
        # Bart, formatted as a MIME digest.
        items = get_queue_messages('virgin', expected_count=1)
        # Bart is the only recipient.
        self.assertEqual(items[0].msgdata['recipients'],
                         set(['bperson@example.com']))
        # The message is a MIME digest, with the structure we expect.
        fp = StringIO()
        structure(items[0].msg, fp)
        self.assertMultiLineEqual(fp.getvalue(), """\
multipart/mixed
    text/plain
    text/plain
    multipart/digest
        message/rfc822
            text/plain
    text/plain
""")


class TestI18nDigest(unittest.TestCase):
    layer = ConfigLayer
    maxDiff = None

    def setUp(self):
        config.push('french', """
        [mailman]
        default_language: fr
        """)
        self.addCleanup(config.pop, 'french')
        self._mlist = create_list('test@example.com')
        self._mlist.send_welcome_message = False
        self._mlist.preferred_language = 'fr'
        self._mlist.digest_size_threshold = 0
        self._process = config.handlers['to-digest'].process
        self._runner = make_testable_runner(DigestRunner)
        # Add a French version of the digest masthead.
        tempdir = TemporaryDirectory()
        self.addCleanup(tempdir.cleanup)
        french_path = os.path.join(tempdir.name, 'fr', 'masthead.txt')
        os.makedirs(os.path.dirname(french_path))
        with open(french_path, 'w', encoding='utf-8') as fp:
            print("""\
Envoyez vos messages pour la liste $display_name à
\t$got_list_email

Pour vous (dés)abonner par courriel, envoyez un message avec « help » dans
le corps ou dans le sujet à
\t$got_request_email

Vous pouvez contacter l'administrateur de la liste à l'adresse
\t$got_owner_email

Si vous répondez, n'oubliez pas de changer l'objet du message afin
qu'il soit plus spécifique que « Re: Contenu du groupe de $display_name...
""", file=fp)
        getUtility(ITemplateManager).set(
            'list:member:digest:masthead', self._mlist.list_id,
            'file:///{}/$language/masthead.txt'.format(tempdir.name))

    def test_multilingual_digest(self):
        # When messages come in with a content-type character set different
        # than that of the list's preferred language, recipients will get an
        # internationalized digest.
        #
        # Subscribe some users receiving digests.
        anne = subscribe(self._mlist, 'Anne')
        anne.preferences.delivery_mode = DeliveryMode.mime_digests
        bart = subscribe(self._mlist, 'Bart')
        bart.preferences.delivery_mode = DeliveryMode.plaintext_digests
        msg = mfs("""\
From: aperson@example.org
To: test@example.com
Subject: =?iso-2022-jp?b?GyRCMGxIVhsoQg==?=
MIME-Version: 1.0
Content-Type: text/plain; charset=iso-2022-jp
Content-Transfer-Encoding: 7bit

\x1b$B0lHV\x1b(B
""")
        self._process(self._mlist, msg, {})
        self._runner.run()
        # There are two digests in the virgin queue; one is the MIME digest
        # and the other is the RFC 1153 digest.
        items = get_queue_messages('virgin', expected_count=2)
        if items[0].msg.is_multipart():
            mime, rfc1153 = items[0].msg, items[1].msg
        else:
            rfc1153, mime = items[0].msg, items[1].msg
        # The MIME version contains a mix of French and Japanese.  The digest
        # chrome added by Mailman is in French.
        self.assertEqual(mime['subject'].encode(),
                         '=?iso-8859-1?q?Groupe_Test=2C_Vol_1=2C_Parution_1?=')
        self.assertEqual(str(mime['subject']),
                         'Groupe Test, Vol 1, Parution 1')
        # The first subpart contains the iso-8859-1 masthead.
        masthead = mime.get_payload(0).get_payload(decode=True).decode(
            'iso-8859-1')
        self.assertMultiLineEqual(masthead.splitlines()[0],
                                  'Envoyez vos messages pour la liste Test à')
        # The second subpart contains the utf-8 table of contents.
        self.assertEqual(mime.get_payload(1)['content-description'],
                         "Today's Topics (1 messages)")
        toc = mime.get_payload(1).get_payload(decode=True).decode('utf-8')
        self.assertMultiLineEqual(toc.splitlines()[0], 'Thèmes du jour :')
        # The third subpart is a multipart/digest part and its first subpart
        # contains the posted message in Japanese.
        self.assertEqual(mime.get_payload(2).get_content_type(),
                         'multipart/digest')
        self.assertEqual(mime.get_payload(2).get_payload(0).get_content_type(),
                         'message/rfc822')
        post = mime.get_payload(2).get_payload(0).get_payload(0)
        self.assertEqual(post['subject'], '=?iso-2022-jp?b?GyRCMGxIVhsoQg==?=')
        # Compare the bytes so that this module doesn't contain string
        # literals in multiple incompatible character sets.
        self.assertEqual(post.get_payload(decode=True), b'\x1b$B0lHV\x1b(B\n')
        # The RFC 1153 digest will have the same subject, but its payload will
        # be recast into utf-8.
        self.assertEqual(str(rfc1153['subject']),
                         'Groupe Test, Vol 1, Parution 1')
        self.assertEqual(rfc1153.get_charset(), 'utf-8')
        lines = rfc1153.get_payload(decode=True).decode('utf-8').splitlines()
        self.assertEqual(lines[0], 'Envoyez vos messages pour la liste Test à')