summaryrefslogtreecommitdiff
path: root/src/mailman/chains
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman/chains')
-rw-r--r--src/mailman/chains/headers.py114
-rw-r--r--src/mailman/chains/tests/test_headers.py126
2 files changed, 181 insertions, 59 deletions
diff --git a/src/mailman/chains/headers.py b/src/mailman/chains/headers.py
index 335c0a156..d9f8356f8 100644
--- a/src/mailman/chains/headers.py
+++ b/src/mailman/chains/headers.py
@@ -27,41 +27,36 @@ __all__ = [
import re
import logging
-import itertools
from zope.interface import implements
from mailman.chains.base import Chain, Link
from mailman.config import config
from mailman.core.i18n import _
-from mailman.interfaces.chain import IChainIterator, LinkAction
+from mailman.interfaces.chain import LinkAction
from mailman.interfaces.rules import IRule
-log = logging.getLogger('mailman.vette')
+log = logging.getLogger('mailman.error')
-def make_link(entry):
+def make_link(header, pattern):
"""Create a Link object.
- :param entry: a 2- or 3-tuple describing a link. If a 2-tuple, it is a
- header and a pattern, and a default chain of 'hold' will be used. If
- a 3-tuple, the third item is the chain name to use.
- :return: an ILink.
+ The link action is always to defer, since at the end of all the header
+ checks, we'll jump to the chain defined in the configuration file, should
+ any of them have matched.
+
+ :param header: The email header name to check, e.g. X-Spam.
+ :type header: string
+ :param pattern: A regular expression for matching the header value.
+ :type pattern: string
+ :return: The link representing this rule check.
+ :rtype: `ILink`
"""
- if len(entry) == 2:
- header, pattern = entry
- chain_name = 'hold'
- elif len(entry) == 3:
- header, pattern, chain_name = entry
- # We don't assert that the chain exists here because the jump
- # chain may not yet have been created.
- else:
- raise AssertionError('Bad link description: {0}'.format(entry))
rule = HeaderMatchRule(header, pattern)
- chain = config.chains[chain_name]
- return Link(rule, LinkAction.jump, chain)
+ return Link(rule, LinkAction.defer)
@@ -73,8 +68,8 @@ class HeaderMatchRule:
_count = 1
def __init__(self, header, pattern):
- self._header = header
- self._pattern = pattern
+ self.header = header
+ self.pattern = pattern
self.name = 'header-match-{0:02}'.format(HeaderMatchRule._count)
HeaderMatchRule._count += 1
self.description = '{0}: {1}'.format(header, pattern)
@@ -85,11 +80,16 @@ class HeaderMatchRule:
# rule name. I suppose we could do the better hit recording in the
# check() method, and set self.record = False.
self.record = True
+ # Register this rule so that other parts of the system can query it.
+ assert self.name not in config.rules, (
+ 'Duplicate HeaderMatchRule: {0} [{1}: {2}]'.format(
+ self.name, self.header, self.pattern))
+ config.rules[self.name] = self
def check(self, mlist, msg, msgdata):
"""See `IRule`."""
- for value in msg.get_all(self._header, []):
- if re.search(self._pattern, value, re.IGNORECASE):
+ for value in msg.get_all(self.header, []):
+ if re.search(self.pattern, value, re.IGNORECASE):
return True
return False
@@ -104,53 +104,49 @@ class HeaderMatchChain(Chain):
def __init__(self):
super(HeaderMatchChain, self).__init__(
'header-match', _('The built-in header matching chain'))
- # The header match rules are not global, so don't register them.
- # These are the only rules that the header match chain can execute.
- self._links = []
- # Initialize header check rules with those from the global
- # HEADER_MATCHES variable.
- for entry in config.header_matches:
- self._links.append(make_link(entry))
- # Keep track of how many global header matching rules we've seen.
- # This is so the flush() method will only delete those that were added
- # via extend() or append_link().
- self._permanent_link_count = len(self._links)
+ # This chain will dynamically calculate the links from the
+ # configuration file, the database, and any explicitly added header
+ # checks (via the .extend() method).
+ self._extended_links = []
- def extend(self, header, pattern, chain_name='hold'):
+ def extend(self, header, pattern):
"""Extend the existing header matches.
:param header: The case-insensitive header field name.
:param pattern: The pattern to match the header's value again. The
match is not anchored and is done case-insensitively.
- :param chain: Option chain to jump to if the pattern matches any of
- the named header values. If not given, the 'hold' chain is used.
"""
- self._links.append(make_link((header, pattern, chain_name)))
+ self._extended_links.append(make_link(header, pattern))
def flush(self):
"""See `IMutableChain`."""
- del self._links[self._permanent_link_count:]
+ # Remove all dynamically created rules. Use the keys so we can mutate
+ # the dictionary inside the loop.
+ for rule_name in config.rules.keys():
+ if rule_name.startswith('header-match-'):
+ del config.rules[rule_name]
+ self._extended_links = []
def get_links(self, mlist, msg, msgdata):
"""See `IChain`."""
- list_iterator = HeaderMatchIterator(mlist)
- return itertools.chain(iter(self._links), iter(list_iterator))
-
- def __iter__(self):
- for link in self._links:
+ # First return all the configuration file links.
+ for line in config.antispam.header_checks.splitlines():
+ if len(line.strip()) == 0:
+ continue
+ parts = line.split(':', 1)
+ if len(parts) != 2:
+ log.error('Configuration error: [antispam]header_checks '
+ 'contains bogus line: {0}'.format(line))
+ continue
+ yield make_link(parts[0], parts[1].lstrip())
+ # Then return all the list-specific header matches.
+ # Python 3.3: Use 'yield from'
+ for entry in mlist.header_matches:
+ yield make_link(*entry)
+ # Then return all the explicitly added links.
+ for link in self._extended_links:
yield link
-
-
-
-class HeaderMatchIterator:
- """An iterator of both the global and list-specific chain links."""
-
- implements(IChainIterator)
-
- def __init__(self, mlist):
- self._mlist = mlist
-
- def __iter__(self):
- """See `IChainIterator`."""
- for entry in self._mlist.header_matches:
- yield make_link(entry)
+ # Finally, if any of the above rules matched, jump to the chain
+ # defined in the configuration file.
+ yield Link(config.rules['any'], LinkAction.jump,
+ config.chains[config.antispam.jump_chain])
diff --git a/src/mailman/chains/tests/test_headers.py b/src/mailman/chains/tests/test_headers.py
new file mode 100644
index 000000000..65a23d891
--- /dev/null
+++ b/src/mailman/chains/tests/test_headers.py
@@ -0,0 +1,126 @@
+# Copyright (C) 2012 by the Free Software Foundation, Inc.
+#
+# This file is part of GNU Mailman.
+#
+# GNU Mailman is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
+
+"""Test the header chain."""
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+__metaclass__ = type
+__all__ = [
+ 'TestHeaderChain',
+ ]
+
+
+import unittest
+
+from mailman.app.lifecycle import create_list
+from mailman.chains.headers import HeaderMatchRule
+from mailman.config import config
+from mailman.email.message import Message
+from mailman.interfaces.chain import LinkAction
+from mailman.testing.layers import ConfigLayer
+from mailman.testing.helpers import LogFileMark, configuration
+
+
+
+class TestHeaderChain(unittest.TestCase):
+ """Test the header chain code."""
+
+ layer = ConfigLayer
+
+ def setUp(self):
+ self._mlist = create_list('test@example.com')
+ # Python 2.6 does not have assertListEqual().
+ self._leq = getattr(self, 'assertListEqual', self.assertEqual)
+
+ @configuration('antispam', header_checks="""
+ Foo: a+
+ Bar: bb?
+ """)
+ def test_config_checks(self):
+ # Test that the header-match chain has the header checks from the
+ # configuration file.
+ chain = config.chains['header-match']
+ # The links are created dynamically; the rule names will all start
+ # with the same prefix, but have a variable suffix. The actions will
+ # all be to jump to the named chain. Do these checks now, while we
+ # collect other useful information.
+ post_checks = []
+ saw_any_rule = False
+ for link in chain.get_links(self._mlist, Message(), {}):
+ if link.rule.name == 'any':
+ saw_any_rule = True
+ self.assertEqual(link.action, LinkAction.jump)
+ elif saw_any_rule:
+ raise AssertionError("'any' rule was not last")
+ else:
+ self.assertEqual(link.rule.name[:13], 'header-match-')
+ self.assertEqual(link.action, LinkAction.defer)
+ post_checks.append((link.rule.header, link.rule.pattern))
+ self._leq(post_checks, [
+ ('Foo', 'a+'),
+ ('Bar', 'bb?'),
+ ])
+
+ @configuration('antispam', header_checks="""
+ Foo: foo
+ A-bad-line
+ Bar: bar
+ """)
+ def test_bad_configuration_line(self):
+ # Take a mark on the error log file.
+ mark = LogFileMark('mailman.error')
+ # A bad value in [antispam]header_checks should just get ignored, but
+ # with an error message logged.
+ chain = config.chains['header-match']
+ # The links are created dynamically; the rule names will all start
+ # with the same prefix, but have a variable suffix. The actions will
+ # all be to jump to the named chain. Do these checks now, while we
+ # collect other useful information.
+ post_checks = []
+ saw_any_rule = False
+ for link in chain.get_links(self._mlist, Message(), {}):
+ if link.rule.name == 'any':
+ saw_any_rule = True
+ self.assertEqual(link.action, LinkAction.jump)
+ elif saw_any_rule:
+ raise AssertionError("'any' rule was not last")
+ else:
+ self.assertEqual(link.rule.name[:13], 'header-match-')
+ self.assertEqual(link.action, LinkAction.defer)
+ post_checks.append((link.rule.header, link.rule.pattern))
+ self._leq(post_checks, [
+ ('Foo', 'foo'),
+ ('Bar', 'bar'),
+ ])
+ # Check the error log.
+ self.assertEqual(mark.readline()[-77:-1],
+ 'Configuration error: [antispam]header_checks '
+ 'contains bogus line: A-bad-line')
+
+ def test_duplicate_header_match_rule(self):
+ # 100% coverage: test an assertion in a corner case.
+ #
+ # Save the existing rules so they can be restored later.
+ saved_rules = config.rules.copy()
+ next_rule_name = 'header-match-{0:02}'.format(HeaderMatchRule._count)
+ config.rules[next_rule_name] = object()
+ try:
+ self.assertRaises(AssertionError,
+ HeaderMatchRule, 'x-spam-score', '.*')
+ finally:
+ config.rules = saved_rules