# Copyright (C) 2012-2016 by the Free Software Foundation, Inc.
#
# This file is part of GNU Mailman.
#
# GNU Mailman is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# GNU Mailman. If not, see .
"""Test the header chain."""
import unittest
from mailman.app.lifecycle import create_list
from mailman.chains.headers import HeaderMatchRule, make_link
from mailman.config import config
from mailman.core.chains import process
from mailman.email.message import Message
from mailman.interfaces.chain import DiscardEvent, HoldEvent, LinkAction
from mailman.interfaces.mailinglist import IHeaderMatchList
from mailman.testing.helpers import (
LogFileMark, configuration, event_subscribers,
specialized_message_from_string as mfs)
from mailman.testing.layers import ConfigLayer
class TestHeaderChain(unittest.TestCase):
"""Test the header chain code."""
layer = ConfigLayer
def setUp(self):
self._mlist = create_list('test@example.com')
def test_make_link(self):
# Test that make_link() with no given chain creates a Link with a
# deferred link action.
link = make_link('Subject', '[tT]esting')
self.assertEqual(link.rule.header, 'Subject')
self.assertEqual(link.rule.pattern, '[tT]esting')
self.assertEqual(link.action, LinkAction.defer)
self.assertIsNone(link.chain)
def test_make_link_with_chain(self):
# Test that make_link() with a given chain creates a Link with a jump
# action to the chain.
link = make_link('Subject', '[tT]esting', 'accept')
self.assertEqual(link.rule.header, 'Subject')
self.assertEqual(link.rule.pattern, '[tT]esting')
self.assertEqual(link.action, LinkAction.jump)
self.assertEqual(link.chain, config.chains['accept'])
@configuration('antispam', header_checks="""
Foo: a+
Bar: bb?
""")
def test_config_checks(self):
# Test that the header-match chain has the header checks from the
# configuration file.
chain = config.chains['header-match']
# The links are created dynamically; the rule names will all start
# with the same prefix, but have a variable suffix. The actions will
# all be to jump to the named chain. Do these checks now, while we
# collect other useful information.
post_checks = []
saw_any_rule = False
for link in chain.get_links(self._mlist, Message(), {}):
if link.rule.name == 'any':
saw_any_rule = True
self.assertEqual(link.action, LinkAction.jump)
elif saw_any_rule:
raise AssertionError("'any' rule was not last")
else:
self.assertEqual(link.rule.name[:13], 'header-match-')
self.assertEqual(link.action, LinkAction.defer)
post_checks.append((link.rule.header, link.rule.pattern))
self.assertListEqual(post_checks, [
('Foo', 'a+'),
('Bar', 'bb?'),
])
@configuration('antispam', header_checks="""
Foo: foo
A-bad-line
Bar: bar
""")
def test_bad_configuration_line(self):
# Take a mark on the error log file.
mark = LogFileMark('mailman.error')
# A bad value in [antispam]header_checks should just get ignored, but
# with an error message logged.
chain = config.chains['header-match']
# The links are created dynamically; the rule names will all start
# with the same prefix, but have a variable suffix. The actions will
# all be to jump to the named chain. Do these checks now, while we
# collect other useful information.
post_checks = []
saw_any_rule = False
for link in chain.get_links(self._mlist, Message(), {}):
if link.rule.name == 'any':
saw_any_rule = True
self.assertEqual(link.action, LinkAction.jump)
elif saw_any_rule:
raise AssertionError("'any' rule was not last")
else:
self.assertEqual(link.rule.name[:13], 'header-match-')
self.assertEqual(link.action, LinkAction.defer)
post_checks.append((link.rule.header, link.rule.pattern))
self.assertListEqual(post_checks, [
('Foo', 'foo'),
('Bar', 'bar'),
])
# Check the error log.
self.assertEqual(mark.readline()[-77:-1],
'Configuration error: [antispam]header_checks '
'contains bogus line: A-bad-line')
def test_duplicate_header_match_rule(self):
# 100% coverage: test an assertion in a corner case.
#
# Save the existing rules so they can be restored later.
saved_rules = config.rules.copy()
self.addCleanup(setattr, config, 'rules', saved_rules)
HeaderMatchRule('x-spam-score', '*', suffix='100')
self.assertRaises(AssertionError,
HeaderMatchRule, 'x-spam-score', '.*', suffix='100')
def test_list_rule(self):
# Test that the header-match chain has the header checks from the
# mailing-list configuration.
chain = config.chains['header-match']
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('Foo', 'a+')
links = [link for link in chain.get_links(self._mlist, Message(), {})
if link.rule.name != 'any']
self.assertEqual(len(links), 1)
self.assertEqual(links[0].action, LinkAction.jump)
self.assertEqual(links[0].chain.name, config.antispam.jump_chain)
self.assertEqual(links[0].rule.header, 'foo')
self.assertEqual(links[0].rule.pattern, 'a+')
self.assertTrue(links[0].rule.name.startswith(
'header-match-test.example.com-'))
def test_list_complex_rule(self):
# Test that the mailing-list header-match complex rules are read
# properly.
chain = config.chains['header-match']
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('Foo', 'a+', 'reject')
header_matches.append('Bar', 'b+', 'discard')
header_matches.append('Baz', 'z+', 'accept')
links = [link for link in chain.get_links(self._mlist, Message(), {})
if link.rule.name != 'any']
self.assertEqual(len(links), 3)
self.assertEqual([
(link.rule.header, link.rule.pattern, link.action, link.chain.name)
for link in links
],
[('foo', 'a+', LinkAction.jump, 'reject'),
('bar', 'b+', LinkAction.jump, 'discard'),
('baz', 'z+', LinkAction.jump, 'accept'),
]) # noqa: E124
@configuration('antispam', header_checks="""
Foo: foo
""", jump_chain='hold')
def test_priority_site_over_list(self):
# Test that the site-wide checks take precedence over the list-specific
# checks.
msg = mfs("""\
From: anne@example.com
To: test@example.com
Subject: A message
Message-ID:
Foo: foo
MIME-Version: 1.0
A message body.
""")
msgdata = {}
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('Foo', 'foo', 'accept')
# This event subscriber records the event that occurs when the message
# is processed by the owner chain.
events = []
with event_subscribers(events.append):
process(self._mlist, msg, msgdata, start_chain='header-match')
self.assertEqual(len(events), 1)
event = events[0]
# Site-wide wants to hold the message, the list wants to accept it.
self.assertIsInstance(event, HoldEvent)
self.assertEqual(event.chain, config.chains['hold'])
def test_no_action_defaults_to_site_wide_action(self):
# If the list-specific header check matches, but there is no defined
# action, the site-wide antispam action is used.
msg = mfs("""\
From: anne@example.com
To: test@example.com
Subject: A message
Message-ID:
Foo: foo
MIME-Version: 1.0
A message body.
""")
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('Foo', 'foo')
# This event subscriber records the event that occurs when the message
# is processed by the owner chain, which holds its for approval.
events = []
def record_holds(event): # noqa: E301
if not isinstance(event, HoldEvent):
return
events.append(event)
with event_subscribers(record_holds):
# Set the site-wide antispam action to hold the message.
with configuration('antispam', header_checks="""
Spam: [*]{3,}
""", jump_chain='hold'): # noqa: E125
process(self._mlist, msg, {}, start_chain='header-match')
self.assertEqual(len(events), 1)
event = events[0]
self.assertIsInstance(event, HoldEvent)
self.assertEqual(event.chain, config.chains['hold'])
self.assertEqual(event.mlist, self._mlist)
self.assertEqual(event.msg, msg)
events = []
def record_discards(event): # noqa: E301
if not isinstance(event, DiscardEvent):
return
events.append(event)
with event_subscribers(record_discards):
# Set the site-wide default to discard the message.
msg.replace_header('Message-Id', '')
with configuration('antispam', header_checks="""
Spam: [*]{3,}
""", jump_chain='discard'): # noqa: E125
process(self._mlist, msg, {}, start_chain='header-match')
self.assertEqual(len(events), 1)
event = events[0]
self.assertIsInstance(event, DiscardEvent)
self.assertEqual(event.chain, config.chains['discard'])
self.assertEqual(event.mlist, self._mlist)
self.assertEqual(event.msg, msg)
@configuration('antispam', header_checks="""
Header1: a+
""", jump_chain='hold')
def test_reuse_rules(self):
# Test that existing header-match rules are used instead of creating
# new ones.
chain = config.chains['header-match']
header_matches = IHeaderMatchList(self._mlist)
header_matches.append('Header2', 'b+')
header_matches.append('Header3', 'c+')
def get_links(): # noqa: E301
return [
link for link in chain.get_links(self._mlist, Message(), {})
if link.rule.name != 'any'
]
links_1 = get_links()
self.assertEqual(len(links_1), 3)
links_2 = get_links()
# The link rules both have the same name...
self.assertEqual(
[l.rule.name for l in links_1],
[l.rule.name for l in links_2],
)
# ...and are actually the identical objects.
for link1, link2 in zip(links_1, links_2):
self.assertIs(link1.rule, link2.rule)