1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
|
# Copyright (C) 2011-2017 by the Free Software Foundation, Inc.
#
# This file is part of GNU Mailman.
#
# GNU Mailman is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
"""Test the ListManager."""
import unittest
from mailman.app.lifecycle import create_list
from mailman.app.moderator import hold_message
from mailman.config import config
from mailman.interfaces.address import InvalidEmailAddressError
from mailman.interfaces.autorespond import IAutoResponseSet, Response
from mailman.interfaces.domain import IDomainManager
from mailman.interfaces.listmanager import (
IListManager, ListAlreadyExistsError, ListCreatedEvent, ListCreatingEvent,
ListDeletedEvent, ListDeletingEvent)
from mailman.interfaces.mailinglist import IListArchiverSet
from mailman.interfaces.messages import IMessageStore
from mailman.interfaces.pending import IPendable, IPendings
from mailman.interfaces.requests import IListRequests
from mailman.interfaces.subscriptions import ISubscriptionService
from mailman.interfaces.usermanager import IUserManager
from mailman.model.mime import ContentFilter
from mailman.testing.helpers import (
event_subscribers, specialized_message_from_string)
from mailman.testing.layers import ConfigLayer
from zope.component import getUtility
from zope.interface import implementer
@implementer(IPendable)
class SimplePendable(dict):
PEND_TYPE = 'simple'
class TestListManager(unittest.TestCase):
layer = ConfigLayer
def setUp(self):
self._events = []
def _record_event(self, event):
self._events.append(event)
def test_create_list_event(self):
# Test that creating a list in the list manager propagates the
# expected events.
with event_subscribers(self._record_event):
mlist = getUtility(IListManager).create('test@example.com')
self.assertEqual(len(self._events), 2)
self.assertIsInstance(self._events[0], ListCreatingEvent)
self.assertEqual(self._events[0].fqdn_listname, 'test@example.com')
self.assertIsInstance(self._events[1], ListCreatedEvent)
self.assertEqual(self._events[1].mailing_list, mlist)
def test_delete_list_event(self):
# Test that deleting a list in the list manager propagates the
# expected event.
mlist = create_list('another@example.com')
with event_subscribers(self._record_event):
getUtility(IListManager).delete(mlist)
self.assertEqual(len(self._events), 2)
self.assertIsInstance(self._events[0], ListDeletingEvent)
self.assertEqual(self._events[0].mailing_list, mlist)
self.assertIsInstance(self._events[1], ListDeletedEvent)
self.assertEqual(self._events[1].fqdn_listname, 'another@example.com')
def test_list_manager_list_ids(self):
# You can get all the list ids for all the existing mailing lists.
create_list('ant@example.com')
create_list('bee@example.com')
create_list('cat@example.com')
self.assertEqual(
sorted(getUtility(IListManager).list_ids),
['ant.example.com', 'bee.example.com', 'cat.example.com'])
def test_delete_list_with_list_archiver_set(self):
# Ensure that mailing lists with archiver sets can be deleted. In
# issue #115, this fails under PostgreSQL, but not SQLite.
mlist = create_list('ant@example.com')
# We don't keep a reference to this archiver set just because it makes
# pyflakes unhappy. It doesn't change the outcome.
IListArchiverSet(mlist)
list_manager = getUtility(IListManager)
list_manager.delete(mlist)
self.assertIsNone(list_manager.get('ant@example.com'))
def test_delete_list_with_autoresponse_record(self):
# Ensure that mailing lists with auto-response sets can be deleted. In
# issue #115, this fails under PostgreSQL, but not SQLite.
list_manager = getUtility(IListManager)
user_manager = getUtility(IUserManager)
mlist = create_list('ant@example.com')
address = user_manager.create_address('aperson@example.com')
autoresponse_set = IAutoResponseSet(mlist)
autoresponse_set.response_sent(address, Response.hold)
list_manager.delete(mlist)
self.assertIsNone(list_manager.get('ant@example.com'))
def test_find_advertised_lists(self):
ant = create_list('ant@example.com')
bee = create_list('bee@example.com')
self.assertTrue(bee.advertised)
ant.advertised = False
result = getUtility(IListManager).find(advertised=True)
self.assertEqual(len(result), 1)
self.assertEqual(result[0], bee)
def test_find_by_mail_host_and_advertised(self):
ant = create_list('ant@example.com')
bee = create_list('bee@example.com')
getUtility(IDomainManager).add('example.org')
cat = create_list('cat@example.org')
dog = create_list('dog@example.org')
self.assertTrue(bee.advertised)
ant.advertised = False
self.assertTrue(cat.advertised)
dog.advertised = False
result = getUtility(IListManager).find(
mail_host='example.org', advertised=True)
self.assertEqual(len(result), 1)
self.assertEqual(result[0], cat)
def test_find_by_list_spec(self):
ant = create_list('ant@example.com')
list_manager = getUtility(IListManager)
self.assertEqual(list_manager.get('ant@example.com'), ant)
self.assertEqual(list_manager.get('ant.example.com'), ant)
def test_find_by_list_id(self):
ant = create_list('ant@example.com')
list_manager = getUtility(IListManager)
self.assertEqual(list_manager.get_by_list_id('ant.example.com'), ant)
self.assertIsNone(list_manager.get_by_list_id('ant@example.com'))
def test_find_by_fqdn(self):
ant = create_list('ant@example.com')
list_manager = getUtility(IListManager)
self.assertEqual(list_manager.get_by_fqdn('ant@example.com'), ant)
self.assertIsNone(list_manager.get_by_fqdn('ant.example.com'))
class TestListLifecycleEvents(unittest.TestCase):
layer = ConfigLayer
def setUp(self):
self._ant = create_list('ant@example.com')
self._bee = create_list('bee@example.com')
self._usermanager = getUtility(IUserManager)
def test_members_are_deleted_when_mailing_list_is_deleted(self):
# When a mailing list with members is deleted, all the Member records
# are also deleted.
anne = self._usermanager.create_address('anne@example.com')
bart = self._usermanager.create_address('bart@example.com')
anne_ant = self._ant.subscribe(anne)
anne_bee = self._bee.subscribe(anne)
bart_ant = self._ant.subscribe(bart)
anne_ant_id = anne_ant.member_id
anne_bee_id = anne_bee.member_id
bart_ant_id = bart_ant.member_id
getUtility(IListManager).delete(self._ant)
service = getUtility(ISubscriptionService)
# We deleted the ant@example.com mailing list. Anne's and Bart's
# membership in this list should now be removed, but Anne's membership
# in bee@example.com should still exist.
self.assertIsNone(service.get_member(anne_ant_id))
self.assertIsNone(service.get_member(bart_ant_id))
self.assertEqual(service.get_member(anne_bee_id), anne_bee)
def test_requests_are_deleted_when_mailing_list_is_deleted(self):
# When a mailing list is deleted, its requests database is deleted
# too, e.g. all its message hold requests (but not the messages
# themselves).
msg = specialized_message_from_string("""\
From: anne@example.com
To: ant@example.com
Subject: Hold me
Message-ID: <argon>
""")
request_id = hold_message(self._ant, msg)
getUtility(IListManager).delete(self._ant)
# This is a hack. ListRequests don't access self._mailinglist in
# their get_request() method.
requestsdb = IListRequests(self._bee)
request = requestsdb.get_request(request_id)
self.assertEqual(request, None)
saved_message = getUtility(IMessageStore).get_message_by_id('<argon>')
self.assertEqual(saved_message.as_string(), msg.as_string())
def test_content_filters_are_deleted_when_mailing_list_is_deleted(self):
# When a mailing list with content filters is deleted, the filters
# must be deleted first or an IntegrityError will be raised.
filter_names = ('filter_types', 'pass_types',
'filter_extensions', 'pass_extensions')
for name in filter_names:
setattr(self._ant, name, ['test-filter-1', 'test-filter-2'])
getUtility(IListManager).delete(self._ant)
filters = config.db.store.query(ContentFilter).filter_by(
mailing_list=self._ant)
self.assertEqual(filters.count(), 0)
def test_pendings_are_deleted_when_mailing_list_is_deleted(self):
pendingdb = getUtility(IPendings)
pendable_1 = SimplePendable(
type='subscription',
list_id='ant.example.com')
pendingdb.add(pendable_1)
pendable_2 = SimplePendable(
type='subscription',
list_id='bee.example.com')
pendingdb.add(pendable_2)
self.assertEqual(pendingdb.count, 2)
list_manager = getUtility(IListManager)
list_manager.delete(self._ant)
self.assertEqual(pendingdb.count, 1)
list_manager.delete(self._bee)
self.assertEqual(pendingdb.count, 0)
class TestListCreation(unittest.TestCase):
layer = ConfigLayer
def setUp(self):
self._manager = getUtility(IListManager)
def test_create_list_case_folding(self):
# LP: #1117176 describes a problem where list names created in upper
# case are not actually usable by the LMTP server. MySQL
# automatically changes the case of the arguments so this test will
# always fail in case of MySQL.
self._manager.create('my-LIST@example.com')
self.assertIsNone(self._manager.get('my-LIST@example.com'))
mlist = self._manager.get('my-list@example.com')
self.assertEqual(mlist.list_id, 'my-list.example.com')
def test_cannot_create_a_list_twice(self):
self._manager.create('ant@example.com')
self.assertRaises(ListAlreadyExistsError,
self._manager.create, 'ant@example.com')
def test_list_name_must_be_fully_qualified(self):
with self.assertRaises(InvalidEmailAddressError) as cm:
self._manager.create('foo')
self.assertEqual(cm.exception.email, 'foo')
|