diff options
Diffstat (limited to 'src/mailman/chains')
| -rw-r--r-- | src/mailman/chains/headers.py | 114 | ||||
| -rw-r--r-- | src/mailman/chains/tests/test_headers.py | 126 |
2 files changed, 181 insertions, 59 deletions
diff --git a/src/mailman/chains/headers.py b/src/mailman/chains/headers.py index 335c0a156..d9f8356f8 100644 --- a/src/mailman/chains/headers.py +++ b/src/mailman/chains/headers.py @@ -27,41 +27,36 @@ __all__ = [ import re import logging -import itertools from zope.interface import implements from mailman.chains.base import Chain, Link from mailman.config import config from mailman.core.i18n import _ -from mailman.interfaces.chain import IChainIterator, LinkAction +from mailman.interfaces.chain import LinkAction from mailman.interfaces.rules import IRule -log = logging.getLogger('mailman.vette') +log = logging.getLogger('mailman.error') -def make_link(entry): +def make_link(header, pattern): """Create a Link object. - :param entry: a 2- or 3-tuple describing a link. If a 2-tuple, it is a - header and a pattern, and a default chain of 'hold' will be used. If - a 3-tuple, the third item is the chain name to use. - :return: an ILink. + The link action is always to defer, since at the end of all the header + checks, we'll jump to the chain defined in the configuration file, should + any of them have matched. + + :param header: The email header name to check, e.g. X-Spam. + :type header: string + :param pattern: A regular expression for matching the header value. + :type pattern: string + :return: The link representing this rule check. + :rtype: `ILink` """ - if len(entry) == 2: - header, pattern = entry - chain_name = 'hold' - elif len(entry) == 3: - header, pattern, chain_name = entry - # We don't assert that the chain exists here because the jump - # chain may not yet have been created. - else: - raise AssertionError('Bad link description: {0}'.format(entry)) rule = HeaderMatchRule(header, pattern) - chain = config.chains[chain_name] - return Link(rule, LinkAction.jump, chain) + return Link(rule, LinkAction.defer) @@ -73,8 +68,8 @@ class HeaderMatchRule: _count = 1 def __init__(self, header, pattern): - self._header = header - self._pattern = pattern + self.header = header + self.pattern = pattern self.name = 'header-match-{0:02}'.format(HeaderMatchRule._count) HeaderMatchRule._count += 1 self.description = '{0}: {1}'.format(header, pattern) @@ -85,11 +80,16 @@ class HeaderMatchRule: # rule name. I suppose we could do the better hit recording in the # check() method, and set self.record = False. self.record = True + # Register this rule so that other parts of the system can query it. + assert self.name not in config.rules, ( + 'Duplicate HeaderMatchRule: {0} [{1}: {2}]'.format( + self.name, self.header, self.pattern)) + config.rules[self.name] = self def check(self, mlist, msg, msgdata): """See `IRule`.""" - for value in msg.get_all(self._header, []): - if re.search(self._pattern, value, re.IGNORECASE): + for value in msg.get_all(self.header, []): + if re.search(self.pattern, value, re.IGNORECASE): return True return False @@ -104,53 +104,49 @@ class HeaderMatchChain(Chain): def __init__(self): super(HeaderMatchChain, self).__init__( 'header-match', _('The built-in header matching chain')) - # The header match rules are not global, so don't register them. - # These are the only rules that the header match chain can execute. - self._links = [] - # Initialize header check rules with those from the global - # HEADER_MATCHES variable. - for entry in config.header_matches: - self._links.append(make_link(entry)) - # Keep track of how many global header matching rules we've seen. - # This is so the flush() method will only delete those that were added - # via extend() or append_link(). - self._permanent_link_count = len(self._links) + # This chain will dynamically calculate the links from the + # configuration file, the database, and any explicitly added header + # checks (via the .extend() method). + self._extended_links = [] - def extend(self, header, pattern, chain_name='hold'): + def extend(self, header, pattern): """Extend the existing header matches. :param header: The case-insensitive header field name. :param pattern: The pattern to match the header's value again. The match is not anchored and is done case-insensitively. - :param chain: Option chain to jump to if the pattern matches any of - the named header values. If not given, the 'hold' chain is used. """ - self._links.append(make_link((header, pattern, chain_name))) + self._extended_links.append(make_link(header, pattern)) def flush(self): """See `IMutableChain`.""" - del self._links[self._permanent_link_count:] + # Remove all dynamically created rules. Use the keys so we can mutate + # the dictionary inside the loop. + for rule_name in config.rules.keys(): + if rule_name.startswith('header-match-'): + del config.rules[rule_name] + self._extended_links = [] def get_links(self, mlist, msg, msgdata): """See `IChain`.""" - list_iterator = HeaderMatchIterator(mlist) - return itertools.chain(iter(self._links), iter(list_iterator)) - - def __iter__(self): - for link in self._links: + # First return all the configuration file links. + for line in config.antispam.header_checks.splitlines(): + if len(line.strip()) == 0: + continue + parts = line.split(':', 1) + if len(parts) != 2: + log.error('Configuration error: [antispam]header_checks ' + 'contains bogus line: {0}'.format(line)) + continue + yield make_link(parts[0], parts[1].lstrip()) + # Then return all the list-specific header matches. + # Python 3.3: Use 'yield from' + for entry in mlist.header_matches: + yield make_link(*entry) + # Then return all the explicitly added links. + for link in self._extended_links: yield link - - - -class HeaderMatchIterator: - """An iterator of both the global and list-specific chain links.""" - - implements(IChainIterator) - - def __init__(self, mlist): - self._mlist = mlist - - def __iter__(self): - """See `IChainIterator`.""" - for entry in self._mlist.header_matches: - yield make_link(entry) + # Finally, if any of the above rules matched, jump to the chain + # defined in the configuration file. + yield Link(config.rules['any'], LinkAction.jump, + config.chains[config.antispam.jump_chain]) diff --git a/src/mailman/chains/tests/test_headers.py b/src/mailman/chains/tests/test_headers.py new file mode 100644 index 000000000..65a23d891 --- /dev/null +++ b/src/mailman/chains/tests/test_headers.py @@ -0,0 +1,126 @@ +# Copyright (C) 2012 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. + +"""Test the header chain.""" + +from __future__ import absolute_import, print_function, unicode_literals + +__metaclass__ = type +__all__ = [ + 'TestHeaderChain', + ] + + +import unittest + +from mailman.app.lifecycle import create_list +from mailman.chains.headers import HeaderMatchRule +from mailman.config import config +from mailman.email.message import Message +from mailman.interfaces.chain import LinkAction +from mailman.testing.layers import ConfigLayer +from mailman.testing.helpers import LogFileMark, configuration + + + +class TestHeaderChain(unittest.TestCase): + """Test the header chain code.""" + + layer = ConfigLayer + + def setUp(self): + self._mlist = create_list('test@example.com') + # Python 2.6 does not have assertListEqual(). + self._leq = getattr(self, 'assertListEqual', self.assertEqual) + + @configuration('antispam', header_checks=""" + Foo: a+ + Bar: bb? + """) + def test_config_checks(self): + # Test that the header-match chain has the header checks from the + # configuration file. + chain = config.chains['header-match'] + # The links are created dynamically; the rule names will all start + # with the same prefix, but have a variable suffix. The actions will + # all be to jump to the named chain. Do these checks now, while we + # collect other useful information. + post_checks = [] + saw_any_rule = False + for link in chain.get_links(self._mlist, Message(), {}): + if link.rule.name == 'any': + saw_any_rule = True + self.assertEqual(link.action, LinkAction.jump) + elif saw_any_rule: + raise AssertionError("'any' rule was not last") + else: + self.assertEqual(link.rule.name[:13], 'header-match-') + self.assertEqual(link.action, LinkAction.defer) + post_checks.append((link.rule.header, link.rule.pattern)) + self._leq(post_checks, [ + ('Foo', 'a+'), + ('Bar', 'bb?'), + ]) + + @configuration('antispam', header_checks=""" + Foo: foo + A-bad-line + Bar: bar + """) + def test_bad_configuration_line(self): + # Take a mark on the error log file. + mark = LogFileMark('mailman.error') + # A bad value in [antispam]header_checks should just get ignored, but + # with an error message logged. + chain = config.chains['header-match'] + # The links are created dynamically; the rule names will all start + # with the same prefix, but have a variable suffix. The actions will + # all be to jump to the named chain. Do these checks now, while we + # collect other useful information. + post_checks = [] + saw_any_rule = False + for link in chain.get_links(self._mlist, Message(), {}): + if link.rule.name == 'any': + saw_any_rule = True + self.assertEqual(link.action, LinkAction.jump) + elif saw_any_rule: + raise AssertionError("'any' rule was not last") + else: + self.assertEqual(link.rule.name[:13], 'header-match-') + self.assertEqual(link.action, LinkAction.defer) + post_checks.append((link.rule.header, link.rule.pattern)) + self._leq(post_checks, [ + ('Foo', 'foo'), + ('Bar', 'bar'), + ]) + # Check the error log. + self.assertEqual(mark.readline()[-77:-1], + 'Configuration error: [antispam]header_checks ' + 'contains bogus line: A-bad-line') + + def test_duplicate_header_match_rule(self): + # 100% coverage: test an assertion in a corner case. + # + # Save the existing rules so they can be restored later. + saved_rules = config.rules.copy() + next_rule_name = 'header-match-{0:02}'.format(HeaderMatchRule._count) + config.rules[next_rule_name] = object() + try: + self.assertRaises(AssertionError, + HeaderMatchRule, 'x-spam-score', '.*') + finally: + config.rules = saved_rules |
