# Copyright (C) 2013-2017 by the Free Software Foundation, Inc.
#
# This file is part of GNU Mailman.
#
# GNU Mailman is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# GNU Mailman. If not, see .
"""Test MailingLists and related model objects.."""
import unittest
from mailman.app.lifecycle import create_list
from mailman.config import config
from mailman.database.transaction import transaction
from mailman.interfaces.listmanager import IListManager
from mailman.interfaces.mailinglist import (
IAcceptableAliasSet, IHeaderMatchList, IListArchiverSet)
from mailman.interfaces.member import (
AlreadySubscribedError, MemberRole, MissingPreferredAddressError)
from mailman.interfaces.usermanager import IUserManager
from mailman.testing.helpers import (
configuration, get_queue_messages, set_preferred)
from mailman.testing.layers import ConfigLayer
from mailman.utilities.datetime import now
from zope.component import getUtility
class TestMailingList(unittest.TestCase):
layer = ConfigLayer
def setUp(self):
self._mlist = create_list('ant@example.com')
def test_no_duplicate_subscriptions(self):
# A user is not allowed to subscribe more than once to the mailing
# list with the same role.
anne = getUtility(IUserManager).create_user('anne@example.com')
# Give the user a preferred address.
preferred = set_preferred(anne)
# Subscribe Anne to the mailing list as a regular member.
member = self._mlist.subscribe(anne)
self.assertEqual(member.address, preferred)
self.assertEqual(member.role, MemberRole.member)
# A second subscription with the same role will fail.
with self.assertRaises(AlreadySubscribedError) as cm:
self._mlist.subscribe(anne)
self.assertEqual(cm.exception.fqdn_listname, 'ant@example.com')
self.assertEqual(cm.exception.email, 'anne@example.com')
self.assertEqual(cm.exception.role, MemberRole.member)
def test_subscribing_user_must_have_preferred_address(self):
# A user object cannot be subscribed to a mailing list without a
# preferred address.
anne = getUtility(IUserManager).create_user('anne@example.com')
self.assertRaises(MissingPreferredAddressError,
self._mlist.subscribe, anne)
def test_pass_extensions(self):
self._mlist.pass_extensions = ('foo', 'bar', 'baz')
self.assertEqual(list(self._mlist.pass_extensions),
['foo', 'bar', 'baz'])
def test_get_roster_argument(self):
self.assertRaises(ValueError, self._mlist.get_roster, 'members')
def test_subscribe_argument(self):
self.assertRaises(ValueError, self._mlist.subscribe, 'anne')
def test_subscribe_by_user_admin_notification(self):
# A notification is sent to the administrator with the user's email
# address when a user is subscribed instead of an explicit address.
self._mlist.send_welcome_message = False
self._mlist.admin_notify_mchanges = True
manager = getUtility(IUserManager)
user = manager.make_user('anne@example.com', 'Anne Person')
address = manager.create_address('aperson@example.com', 'A. Person')
address.verified_on = now()
user.preferred_address = address
self._mlist.subscribe(user)
# The welcome message was sent to the preferred address.
items = get_queue_messages('virgin', expected_count=1)
self.assertIn('Anne Person ',
items[0].msg.get_payload())
def test_is_subscribed(self):
manager = getUtility(IUserManager)
user = manager.create_user('anne@example.com', 'Anne Person')
set_preferred(user)
self.assertEqual(False, self._mlist.is_subscribed(user))
self._mlist.subscribe(user)
self.assertEqual(True, self._mlist.is_subscribed(user))
address = manager.create_address('anne2@example.com', 'Anne Person')
address.verfied_on = now()
self.assertEqual(False, self._mlist.is_subscribed(address))
self._mlist.subscribe(address)
self.assertEqual(True, self._mlist.is_subscribed(address))
class TestListArchiver(unittest.TestCase):
layer = ConfigLayer
def setUp(self):
self._mlist = create_list('ant@example.com')
self._set = IListArchiverSet(self._mlist)
def test_list_archivers(self):
# Find the set of archivers registered for this mailing list.
self.assertEqual(
['mail-archive', 'mhonarc', 'prototype'],
sorted(archiver.name for archiver in self._set.archivers))
def test_get_archiver(self):
# Use .get() to see if a mailing list has an archiver.
archiver = self._set.get('mhonarc')
self.assertEqual(archiver.name, 'mhonarc')
self.assertTrue(archiver.is_enabled)
self.assertEqual(archiver.mailing_list, self._mlist)
self.assertEqual(archiver.system_archiver.name, 'mhonarc')
def test_get_archiver_no_such(self):
# Using .get() on a non-existing name returns None.
self.assertIsNone(self._set.get('no-such-archiver'))
def test_site_disabled(self):
# Here the system configuration enables all the archivers in time for
# the archive set to be created with all list archivers enabled. But
# then the site-wide archiver gets disabled, so the list specific
# archiver will also be disabled.
archiver_set = IListArchiverSet(self._mlist)
archiver = archiver_set.get('mhonarc')
self.assertTrue(archiver.is_enabled)
# Disable the site-wide archiver.
config.push('enable mhonarc', """\
[archiver.mhonarc]
enable: no
""")
self.assertFalse(archiver.is_enabled)
config.pop('enable mhonarc')
class TestDisabledListArchiver(unittest.TestCase):
layer = ConfigLayer
def setUp(self):
self._mlist = create_list('ant@example.com')
@configuration('archiver.prototype', enable='no')
def test_enable_list_archiver(self):
# When the system configuration file disables an archiver site-wide,
# the list-specific mailing list will get initialized as not enabled.
# Create the archiver set on the fly so that it doesn't get
# initialized with a configuration that enables the prototype archiver.
archiver_set = IListArchiverSet(self._mlist)
archiver = archiver_set.get('prototype')
self.assertFalse(archiver.is_enabled)
# Enable both the list archiver and the system archiver.
archiver.is_enabled = True
config.push('enable prototype', """\
[archiver.prototype]
enable: yes
""")
# Get the IListArchiver again.
archiver = archiver_set.get('prototype')
self.assertTrue(archiver.is_enabled)
config.pop('enable prototype')
class TestAcceptableAliases(unittest.TestCase):
layer = ConfigLayer
def setUp(self):
self._mlist = create_list('ant@example.com')
def test_delete_list_with_acceptable_aliases(self):
# LP: #1432239 - deleting a mailing list with acceptable aliases
# causes a SQLAlchemy error. The aliases must be deleted first.
with transaction():
alias_set = IAcceptableAliasSet(self._mlist)
alias_set.add('bee@example.com')
self.assertEqual(['bee@example.com'], list(alias_set.aliases))
getUtility(IListManager).delete(self._mlist)
self.assertEqual(len(list(alias_set.aliases)), 0)
class TestHeaderMatch(unittest.TestCase):
layer = ConfigLayer
def setUp(self):
self._mlist = create_list('ant@example.com')
def test_lowercase_header(self):
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('Header', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 1)
self.assertEqual(self._mlist.header_matches[0].header, 'header')
def test_chain_defaults_to_none(self):
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('header', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 1)
self.assertIsNone(self._mlist.header_matches[0].chain)
def test_duplicate(self):
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('Header', 'pattern')
self.assertRaises(
ValueError, header_matches.append, 'Header', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 1)
def test_remove_non_existent(self):
header_matches = IHeaderMatchList(self._mlist)
self.assertRaises(
ValueError, header_matches.remove, 'header', 'pattern')
def test_add_remove(self):
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('header1', 'pattern')
header_matches.append('header2', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 2)
self.assertEqual(len(header_matches), 2)
header_matches.remove('header1', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 1)
self.assertEqual(len(header_matches), 1)
del header_matches[0]
self.assertEqual(len(self._mlist.header_matches), 0)
self.assertEqual(len(header_matches), 0)
def test_iterator(self):
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('Header', 'pattern')
header_matches.append('Subject', 'patt.*')
header_matches.append('From', '.*@example.com', 'discard')
header_matches.append('From', '.*@example.org', 'accept')
matches = [(match.header, match.pattern, match.chain)
for match in IHeaderMatchList(self._mlist)]
self.assertEqual(
matches, [
('header', 'pattern', None),
('subject', 'patt.*', None),
('from', '.*@example.com', 'discard'),
('from', '.*@example.org', 'accept'),
])
def test_clear(self):
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('Header', 'pattern')
self.assertEqual(len(self._mlist.header_matches), 1)
with transaction():
header_matches.clear()
self.assertEqual(len(self._mlist.header_matches), 0)
def test_get_by_index(self):
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('header-1', 'pattern-1')
header_matches.append('header-2', 'pattern-2')
header_matches.append('header-3', 'pattern-3')
match = header_matches[1]
self.assertEqual(match.header, 'header-2')
self.assertEqual(match.pattern, 'pattern-2')
def test_get_by_negative_index(self):
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('header-1', 'pattern-1')
header_matches.append('header-2', 'pattern-2')
header_matches.append('header-3', 'pattern-3')
match = header_matches[-1]
self.assertEqual(match.header, 'header-3')
self.assertEqual(match.pattern, 'pattern-3')
def test_get_non_existent_by_index(self):
header_matches = IHeaderMatchList(self._mlist)
with self.assertRaises(IndexError):
header_matches[0]
def test_move_up(self):
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('header-0', 'pattern')
header_matches.append('header-1', 'pattern')
header_matches.append('header-2', 'pattern')
header_matches.append('header-3', 'pattern')
self.assertEqual(
[(match.header, match.position) for match in header_matches], [
('header-0', 0),
('header-1', 1),
('header-2', 2),
('header-3', 3),
])
header_match_2 = self._mlist.header_matches[2]
self.assertEqual(header_match_2.position, 2)
header_match_2.position = 1
self.assertEqual(
[(match.header, match.position) for match in header_matches], [
('header-0', 0),
('header-2', 1),
('header-1', 2),
('header-3', 3),
])
def test_move_down(self):
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('header-0', 'pattern')
header_matches.append('header-1', 'pattern')
header_matches.append('header-2', 'pattern')
header_matches.append('header-3', 'pattern')
self.assertEqual(
[(match.header, match.position) for match in header_matches], [
('header-0', 0),
('header-1', 1),
('header-2', 2),
('header-3', 3),
])
header_match_1 = self._mlist.header_matches[1]
self.assertEqual(header_match_1.position, 1)
header_match_1.position = 2
self.assertEqual(
[(match.header, match.position) for match in header_matches], [
('header-0', 0),
('header-2', 1),
('header-1', 2),
('header-3', 3),
])
def test_move_identical(self):
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('header-0', 'pattern')
header_matches.append('header-1', 'pattern')
header_matches.append('header-2', 'pattern')
self.assertEqual(
[(match.header, match.position) for match in header_matches], [
('header-0', 0),
('header-1', 1),
('header-2', 2),
])
header_match_1 = self._mlist.header_matches[1]
self.assertEqual(header_match_1.position, 1)
header_match_1.position = 1
self.assertEqual(
[(match.header, match.position) for match in header_matches], [
('header-0', 0),
('header-1', 1),
('header-2', 2),
])
def test_move_negative(self):
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('header', 'pattern')
header_match = self._mlist.header_matches[0]
with self.assertRaises(ValueError):
header_match.position = -1
def test_move_invalid(self):
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('header', 'pattern')
header_match = self._mlist.header_matches[0]
with self.assertRaises(ValueError):
header_match.position = 2
def test_insert(self):
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('header-0', 'pattern')
header_matches.append('header-1', 'pattern')
self.assertEqual(
[(match.header, match.position) for match in header_matches], [
('header-0', 0),
('header-1', 1),
])
header_matches.insert(1, 'header-2', 'pattern')
self.assertEqual(
[(match.header, match.position) for match in header_matches], [
('header-0', 0),
('header-2', 1),
('header-1', 2),
])
def test_rebuild_sequence_after_remove(self):
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('header-0', 'pattern')
header_matches.append('header-1', 'pattern')
header_matches.append('header-2', 'pattern')
self.assertEqual(
[(match.header, match.position) for match in header_matches], [
('header-0', 0),
('header-1', 1),
('header-2', 2),
])
del header_matches[0]
self.assertEqual(
[(match.header, match.position) for match in header_matches], [
('header-1', 0),
('header-2', 1),
])
header_matches.remove('header-1', 'pattern')
self.assertEqual(
[(match.header, match.position) for match in header_matches],
[('header-2', 0)])
def test_remove_non_existent_by_index(self):
header_matches = IHeaderMatchList(self._mlist)
with self.assertRaises(IndexError):
del header_matches[0]