1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
|
# Copyright (C) 2012-2017 by the Free Software Foundation, Inc.
#
# This file is part of GNU Mailman.
#
# GNU Mailman is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
"""Test the prototype archiver."""
import os
import shutil
import tempfile
import unittest
import threading
from email import message_from_file
from flufl.lock import Lock
from mailman.app.lifecycle import create_list
from mailman.archiving.prototype import Prototype
from mailman.config import config
from mailman.database.transaction import transaction
from mailman.testing.helpers import (
LogFileMark, specialized_message_from_string as mfs)
from mailman.testing.layers import ConfigLayer
from mailman.utilities.email import add_message_hash
class TestPrototypeArchiver(unittest.TestCase):
"""Test the prototype archiver."""
layer = ConfigLayer
def setUp(self):
# Create a fake mailing list and message object.
self._msg = mfs("""\
To: test@example.com
From: anne@example.com
Subject: Testing the test list
Message-ID: <ant>
Message-ID-Hash: MS6QLWERIJLGCRF44J7USBFDELMNT2BW
Tests are better than no tests
but the water deserves to be swum.
""")
with transaction():
self._mlist = create_list('test@example.com')
# Set up a temporary directory for the prototype archiver so that it's
# easier to clean up.
self._tempdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self._tempdir)
config.push('prototype', """
[paths.testing]
archive_dir: {}
""".format(self._tempdir))
self.addCleanup(config.pop, 'prototype')
# Capture the structure of a maildir.
self._expected_dir_structure = set(
(os.path.join(config.ARCHIVE_DIR, path) for path in (
'prototype',
os.path.join('prototype', self._mlist.fqdn_listname),
os.path.join('prototype', self._mlist.fqdn_listname, 'cur'),
os.path.join('prototype', self._mlist.fqdn_listname, 'new'),
os.path.join('prototype', self._mlist.fqdn_listname, 'tmp'),
)))
self._expected_dir_structure.add(config.ARCHIVE_DIR)
def _find(self, path):
all_filenames = set()
for dirpath, dirnames, filenames in os.walk(path):
if isinstance(dirpath, bytes):
dirpath = dirpath.decode('utf-8')
all_filenames.add(dirpath)
for filename in filenames:
new_filename = filename
if isinstance(filename, bytes):
new_filename = filename.decode('utf-8')
all_filenames.add(os.path.join(dirpath, new_filename))
return all_filenames
def test_archive_maildir_created(self):
# Archiving a message to the prototype archiver should create the
# expected directory structure.
Prototype.archive_message(self._mlist, self._msg)
all_filenames = self._find(config.ARCHIVE_DIR)
# Check that the directory structure has been created and we have one
# more file (the archived message) than expected directories.
archived_messages = all_filenames - self._expected_dir_structure
self.assertEqual(len(archived_messages), 1)
self.assertTrue(
archived_messages.pop().startswith(
os.path.join(config.ARCHIVE_DIR, 'prototype',
self._mlist.fqdn_listname, 'new')))
def test_archive_maildir_existence_does_not_raise(self):
# Archiving a second message does not cause an EEXIST to be raised
# when a second message is archived.
new_dir = None
Prototype.archive_message(self._mlist, self._msg)
for directory in ('cur', 'new', 'tmp'):
path = os.path.join(config.ARCHIVE_DIR, 'prototype',
self._mlist.fqdn_listname, directory)
if directory == 'new':
new_dir = path
self.assertTrue(os.path.isdir(path))
# There should be one message in the 'new' directory.
self.assertEqual(len(os.listdir(new_dir)), 1)
# Archive a second message. If an exception occurs, let it fail the
# test. Afterward, two messages should be in the 'new' directory.
del self._msg['message-id']
del self._msg['message-id-hash']
self._msg['Message-ID'] = '<bee>'
add_message_hash(self._msg)
Prototype.archive_message(self._mlist, self._msg)
self.assertEqual(len(os.listdir(new_dir)), 2)
def test_archive_lock_used(self):
# Test that locking the maildir when adding works as a failure here
# could mean we lose mail.
lock_file = os.path.join(
config.LOCK_DIR, '{0}-maildir.lock'.format(
self._mlist.fqdn_listname))
with Lock(lock_file):
# Acquire the archiver lock, then make sure the archiver logs the
# fact that it could not acquire the lock.
archive_thread = threading.Thread(
target=Prototype.archive_message,
args=(self._mlist, self._msg))
mark = LogFileMark('mailman.error')
archive_thread.run()
# Test that the archiver output the correct error.
line = mark.readline()
# XXX 2012-03-15 BAW: we really should remove timestamp prefixes
# from the loggers when under test.
self.assertTrue(line.endswith(
'Unable to acquire prototype archiver lock for {0}, '
'discarding: {1}\n'.format(
self._mlist.fqdn_listname,
self._msg.get('message-id'))))
# Check that the message didn't get archived.
created_files = self._find(config.ARCHIVE_DIR)
self.assertEqual(self._expected_dir_structure, created_files)
def test_prototype_archiver_good_path(self):
# Verify the good path; the message gets archived.
Prototype.archive_message(self._mlist, self._msg)
new_path = os.path.join(
config.ARCHIVE_DIR, 'prototype', self._mlist.fqdn_listname, 'new')
archived_messages = list(os.listdir(new_path))
self.assertEqual(len(archived_messages), 1)
# Check that the email has been added.
with open(os.path.join(new_path, archived_messages[0])) as fp:
archived_message = message_from_file(fp)
self.assertEqual(self._msg.as_string(), archived_message.as_string())
|