summaryrefslogtreecommitdiff
path: root/src/mailman/rules/tests/test_dmarc.py
diff options
context:
space:
mode:
authorBarry Warsaw2017-01-01 17:25:29 -0500
committerBarry Warsaw2017-01-01 17:28:37 -0500
commite3d6c34b7d02b925cb8c59fa9e1df24741fc46ee (patch)
treee6e27c9dec4557dc46c4b8ffd99bf556da4ed4ed /src/mailman/rules/tests/test_dmarc.py
parentc786a77c3b82932facd63fabe3144f9099f1ea51 (diff)
downloadmailman-e3d6c34b7d02b925cb8c59fa9e1df24741fc46ee.tar.gz
mailman-e3d6c34b7d02b925cb8c59fa9e1df24741fc46ee.tar.zst
mailman-e3d6c34b7d02b925cb8c59fa9e1df24741fc46ee.zip
Diffstat (limited to 'src/mailman/rules/tests/test_dmarc.py')
-rw-r--r--src/mailman/rules/tests/test_dmarc.py214
1 files changed, 170 insertions, 44 deletions
diff --git a/src/mailman/rules/tests/test_dmarc.py b/src/mailman/rules/tests/test_dmarc.py
index 9af3e9347..df5127895 100644
--- a/src/mailman/rules/tests/test_dmarc.py
+++ b/src/mailman/rules/tests/test_dmarc.py
@@ -15,25 +15,31 @@
# You should have received a copy of the GNU General Public License along with
# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
-"""Provides support for mocking dnspython calls from dmarc rules and some
-organizational domain tests."""
+"""Tests and mocks for DMARC rule."""
+
+import os
+import threading
from contextlib import ExitStack
+from datetime import timedelta
from dns.exception import DNSException
from dns.rdatatype import TXT
from dns.resolver import NXDOMAIN, NoAnswer
+from http.server import BaseHTTPRequestHandler, HTTPServer
+from lazr.config import as_timedelta
from mailman.app.lifecycle import create_list
+from mailman.config import config
from mailman.interfaces.mailinglist import DMARCMitigateAction
from mailman.rules import dmarc
from mailman.testing.helpers import (
- LogFileMark, specialized_message_from_string as mfs)
+ LogFileMark, configuration, specialized_message_from_string as mfs,
+ wait_for_webservice)
from mailman.testing.layers import ConfigLayer
+from mailman.utilities.datetime import now
from pkg_resources import resource_filename
from public import public
from unittest import TestCase
from unittest.mock import patch
-from urllib.error import URLError
-from urllib.request import urlopen
@public
@@ -97,16 +103,11 @@ def get_dns_resolver():
@public
-def get_org_data():
- """Create a mock to load the organizational domain data from our local
- test data.
- """
- def ouropen(url):
- datapath = resource_filename(
- 'mailman.rules.tests.data', 'org_domain.txt')
- org_data_url = 'file:///{}'.format(datapath)
- return urlopen(org_data_url)
- return patch('mailman.rules.dmarc.request.urlopen', ouropen)
+def use_test_organizational_data():
+ # Point the organizational URL to our test data.
+ path = resource_filename('mailman.rules.tests.data', 'org_domain.txt')
+ url = 'file:///{}'.format(path)
+ return configuration('mailman', org_domain_data_url=url)
class TestDMARCRules(TestCase):
@@ -120,42 +121,23 @@ class TestDMARCRules(TestCase):
# Make sure every test has a clean cache.
self.cache = {}
self.resources.enter_context(
- patch('mailman.rules.dmarc.s_dict', self.cache))
-
- def test_no_url(self):
- dmarc._get_suffixes(None)
- self.assertEqual(len(self.cache), 0)
+ patch('mailman.rules.dmarc.suffix_cache', self.cache))
+ self.resources.enter_context(use_test_organizational_data())
def test_no_data_for_domain(self):
- with get_org_data():
- self.assertEqual(
- dmarc._get_org_dom('sub.dom.example.nxtld'),
- 'example.nxtld')
+ self.assertEqual(
+ dmarc._get_org_dom('sub.dom.example.nxtld'),
+ 'example.nxtld')
def test_domain_with_wild_card(self):
- with get_org_data():
- self.assertEqual(
- dmarc._get_org_dom('ssub.sub.foo.kobe.jp'),
- 'sub.foo.kobe.jp')
+ self.assertEqual(
+ dmarc._get_org_dom('ssub.sub.foo.kobe.jp'),
+ 'sub.foo.kobe.jp')
def test_exception_to_wild_card(self):
- with get_org_data():
- self.assertEqual(
- dmarc._get_org_dom('ssub.sub.city.kobe.jp'),
- 'city.kobe.jp')
-
- def test_no_publicsuffix_dot_org(self):
- mark = LogFileMark('mailman.error')
- with patch('mailman.rules.dmarc.request.urlopen',
- side_effect=URLError('no internet')):
- domain = dmarc._get_org_dom('ssub.sub.city.kobe.jp')
- line = mark.readline()
self.assertEqual(
- line[-95:],
- 'Unable to retrieve data from '
- 'https://publicsuffix.org/list/public_suffix_list.dat: '
- 'no internet\n')
- self.assertEqual(domain, 'kobe.jp')
+ dmarc._get_org_dom('ssub.sub.city.kobe.jp'),
+ 'city.kobe.jp')
def test_no_at_sign_in_from_address(self):
# If there's no @ sign in the From: address, the rule can't hit.
@@ -190,3 +172,147 @@ To: ant@example.com
'DNSException: Unable to query DMARC policy for '
'anne@example.info (_dmarc.example.info). '
'Abstract base class shared by all dnspython exceptions.\n')
+
+
+# New in Python 3.5.
+try:
+ from http import HTTPStatus
+except ImportError:
+ class HTTPStatus:
+ FORBIDDEN = 403
+ NOT_FOUND = 404
+ OK = 200
+
+
+# We need a web server to vend non-mailman: urls.
+class TestableHandler(BaseHTTPRequestHandler):
+ # Be quiet.
+ def log_request(*args, **kws):
+ pass
+
+ log_error = log_request
+
+ def do_GET(self):
+ if self.path == '/public_suffix_list.dat':
+ self.send_response(HTTPStatus.OK)
+ self.send_header('Content-Type', 'UTF-8')
+ self.end_headers()
+ self.wfile.write(b'abc')
+ else:
+ self.send_error(HTTPStatus.NOT_FOUND)
+
+
+class HTTPLayer(ConfigLayer):
+ httpd = None
+
+ @classmethod
+ def setUp(cls):
+ assert cls.httpd is None, 'Layer already set up'
+ cls.httpd = HTTPServer(('localhost', 8180), TestableHandler)
+ cls._thread = threading.Thread(target=cls.httpd.serve_forever)
+ cls._thread.daemon = True
+ cls._thread.start()
+ wait_for_webservice('localhost', 8180)
+
+ @classmethod
+ def tearDown(cls):
+ assert cls.httpd is not None, 'Layer not set up'
+ cls.httpd.shutdown()
+ cls.httpd.server_close()
+ cls._thread.join()
+
+
+class TestSuffixList(TestCase):
+ layer = HTTPLayer
+
+ def test_cached_copy_is_good(self):
+ cache_path = os.path.join(config.VAR_DIR, dmarc.LOCAL_FILE_NAME)
+ with open(cache_path, 'w', encoding='utf-8') as fp:
+ print('xyz', end='', file=fp)
+ # The cache expires a day from now.
+ expires = (now() + timedelta(days=1)).timestamp()
+ os.utime(cache_path, (expires, expires))
+ new_path = dmarc.ensure_current_suffix_list()
+ self.assertEqual(cache_path, new_path)
+ with open(cache_path, 'r', encoding='utf-8') as fp:
+ contents = fp.read()
+ self.assertEqual(contents, 'xyz')
+ self.assertEqual(os.stat(new_path).st_mtime, expires)
+
+ @configuration(
+ 'dmarc',
+ org_domain_data_url='http://localhost:8180/public_suffix_list.dat')
+ def test_cached_copy_is_expired(self):
+ cache_path = os.path.join(config.VAR_DIR, dmarc.LOCAL_FILE_NAME)
+ with open(cache_path, 'w', encoding='utf-8') as fp:
+ print('xyz', end='', file=fp)
+ # Expire the cache file. That way the current cached file will be
+ # invalid and a new one will be downloaded.
+ expires = (now() - timedelta(days=1)).timestamp()
+ os.utime(cache_path, (expires, expires))
+ new_path = dmarc.ensure_current_suffix_list()
+ self.assertEqual(cache_path, new_path)
+ with open(cache_path, 'r', encoding='utf-8') as fp:
+ contents = fp.read()
+ self.assertEqual(contents, 'abc')
+ self.assertEqual(
+ os.stat(new_path).st_mtime,
+ (now() + as_timedelta(config.dmarc.cache_lifetime)).timestamp())
+
+ @configuration(
+ 'dmarc',
+ org_domain_data_url='http://localhost:8180/public_suffix_list.dat')
+ def test_cached_copy_is_missing(self):
+ cache_path = os.path.join(config.VAR_DIR, dmarc.LOCAL_FILE_NAME)
+ self.assertFalse(os.path.exists(cache_path))
+ new_path = dmarc.ensure_current_suffix_list()
+ self.assertEqual(cache_path, new_path)
+ with open(cache_path, 'r', encoding='utf-8') as fp:
+ contents = fp.read()
+ self.assertEqual(contents, 'abc')
+ self.assertEqual(
+ os.stat(new_path).st_mtime,
+ (now() + as_timedelta(config.dmarc.cache_lifetime)).timestamp())
+
+ @configuration(
+ 'dmarc',
+ org_domain_data_url='http://localhost:8180/public_suffix_list.err')
+ def test_cached_copy_is_missing_download_404s(self):
+ # There's no cached file and we'll get a 404 with the .err file so
+ # we'll have to fall back to our internal copy.
+ cache_path = os.path.join(config.VAR_DIR, dmarc.LOCAL_FILE_NAME)
+ self.assertFalse(os.path.exists(cache_path))
+ new_path = dmarc.ensure_current_suffix_list()
+ self.assertEqual(cache_path, new_path)
+ with open(cache_path, 'r', encoding='utf-8') as fp:
+ contents = fp.read()
+ # The contents is *not* equal to our dummy test data, but don't tie it
+ # too closely to the in-tree file contents since that might change
+ # when and if we update that.
+ self.assertNotEqual(contents, 'abc')
+ self.assertEqual(
+ os.stat(new_path).st_mtime,
+ (now() + as_timedelta(config.dmarc.cache_lifetime)).timestamp())
+
+ @configuration(
+ 'dmarc',
+ org_domain_data_url='http://localhost:8180/public_suffix_list.err')
+ def test_cached_copy_is_expired_download_404s(self):
+ # Because the cached copy is out of date, we try to download the new
+ # version. But that 404s so we end up continuing to use the cached
+ # copy.
+ cache_path = os.path.join(config.VAR_DIR, dmarc.LOCAL_FILE_NAME)
+ with open(cache_path, 'w', encoding='utf-8') as fp:
+ print('xyz', end='', file=fp)
+ # Expire the cache file. That way the current cached file will be
+ # invalid and a new one will be downloaded.
+ expires = (now() - timedelta(days=1)).timestamp()
+ os.utime(cache_path, (expires, expires))
+ new_path = dmarc.ensure_current_suffix_list()
+ self.assertEqual(cache_path, new_path)
+ with open(cache_path, 'r', encoding='utf-8') as fp:
+ contents = fp.read()
+ # The contents are from the cached file.
+ self.assertEqual(contents, 'xyz')
+ # The cached file timestamp doesn't change.
+ self.assertEqual(os.stat(new_path).st_mtime, expires)