diff options
Diffstat (limited to 'src/mailman/rules/tests/test_dmarc.py')
| -rw-r--r-- | src/mailman/rules/tests/test_dmarc.py | 214 |
1 files changed, 170 insertions, 44 deletions
diff --git a/src/mailman/rules/tests/test_dmarc.py b/src/mailman/rules/tests/test_dmarc.py index 9af3e9347..df5127895 100644 --- a/src/mailman/rules/tests/test_dmarc.py +++ b/src/mailman/rules/tests/test_dmarc.py @@ -15,25 +15,31 @@ # You should have received a copy of the GNU General Public License along with # GNU Mailman. If not, see <http://www.gnu.org/licenses/>. -"""Provides support for mocking dnspython calls from dmarc rules and some -organizational domain tests.""" +"""Tests and mocks for DMARC rule.""" + +import os +import threading from contextlib import ExitStack +from datetime import timedelta from dns.exception import DNSException from dns.rdatatype import TXT from dns.resolver import NXDOMAIN, NoAnswer +from http.server import BaseHTTPRequestHandler, HTTPServer +from lazr.config import as_timedelta from mailman.app.lifecycle import create_list +from mailman.config import config from mailman.interfaces.mailinglist import DMARCMitigateAction from mailman.rules import dmarc from mailman.testing.helpers import ( - LogFileMark, specialized_message_from_string as mfs) + LogFileMark, configuration, specialized_message_from_string as mfs, + wait_for_webservice) from mailman.testing.layers import ConfigLayer +from mailman.utilities.datetime import now from pkg_resources import resource_filename from public import public from unittest import TestCase from unittest.mock import patch -from urllib.error import URLError -from urllib.request import urlopen @public @@ -97,16 +103,11 @@ def get_dns_resolver(): @public -def get_org_data(): - """Create a mock to load the organizational domain data from our local - test data. - """ - def ouropen(url): - datapath = resource_filename( - 'mailman.rules.tests.data', 'org_domain.txt') - org_data_url = 'file:///{}'.format(datapath) - return urlopen(org_data_url) - return patch('mailman.rules.dmarc.request.urlopen', ouropen) +def use_test_organizational_data(): + # Point the organizational URL to our test data. + path = resource_filename('mailman.rules.tests.data', 'org_domain.txt') + url = 'file:///{}'.format(path) + return configuration('mailman', org_domain_data_url=url) class TestDMARCRules(TestCase): @@ -120,42 +121,23 @@ class TestDMARCRules(TestCase): # Make sure every test has a clean cache. self.cache = {} self.resources.enter_context( - patch('mailman.rules.dmarc.s_dict', self.cache)) - - def test_no_url(self): - dmarc._get_suffixes(None) - self.assertEqual(len(self.cache), 0) + patch('mailman.rules.dmarc.suffix_cache', self.cache)) + self.resources.enter_context(use_test_organizational_data()) def test_no_data_for_domain(self): - with get_org_data(): - self.assertEqual( - dmarc._get_org_dom('sub.dom.example.nxtld'), - 'example.nxtld') + self.assertEqual( + dmarc._get_org_dom('sub.dom.example.nxtld'), + 'example.nxtld') def test_domain_with_wild_card(self): - with get_org_data(): - self.assertEqual( - dmarc._get_org_dom('ssub.sub.foo.kobe.jp'), - 'sub.foo.kobe.jp') + self.assertEqual( + dmarc._get_org_dom('ssub.sub.foo.kobe.jp'), + 'sub.foo.kobe.jp') def test_exception_to_wild_card(self): - with get_org_data(): - self.assertEqual( - dmarc._get_org_dom('ssub.sub.city.kobe.jp'), - 'city.kobe.jp') - - def test_no_publicsuffix_dot_org(self): - mark = LogFileMark('mailman.error') - with patch('mailman.rules.dmarc.request.urlopen', - side_effect=URLError('no internet')): - domain = dmarc._get_org_dom('ssub.sub.city.kobe.jp') - line = mark.readline() self.assertEqual( - line[-95:], - 'Unable to retrieve data from ' - 'https://publicsuffix.org/list/public_suffix_list.dat: ' - 'no internet\n') - self.assertEqual(domain, 'kobe.jp') + dmarc._get_org_dom('ssub.sub.city.kobe.jp'), + 'city.kobe.jp') def test_no_at_sign_in_from_address(self): # If there's no @ sign in the From: address, the rule can't hit. @@ -190,3 +172,147 @@ To: ant@example.com 'DNSException: Unable to query DMARC policy for ' 'anne@example.info (_dmarc.example.info). ' 'Abstract base class shared by all dnspython exceptions.\n') + + +# New in Python 3.5. +try: + from http import HTTPStatus +except ImportError: + class HTTPStatus: + FORBIDDEN = 403 + NOT_FOUND = 404 + OK = 200 + + +# We need a web server to vend non-mailman: urls. +class TestableHandler(BaseHTTPRequestHandler): + # Be quiet. + def log_request(*args, **kws): + pass + + log_error = log_request + + def do_GET(self): + if self.path == '/public_suffix_list.dat': + self.send_response(HTTPStatus.OK) + self.send_header('Content-Type', 'UTF-8') + self.end_headers() + self.wfile.write(b'abc') + else: + self.send_error(HTTPStatus.NOT_FOUND) + + +class HTTPLayer(ConfigLayer): + httpd = None + + @classmethod + def setUp(cls): + assert cls.httpd is None, 'Layer already set up' + cls.httpd = HTTPServer(('localhost', 8180), TestableHandler) + cls._thread = threading.Thread(target=cls.httpd.serve_forever) + cls._thread.daemon = True + cls._thread.start() + wait_for_webservice('localhost', 8180) + + @classmethod + def tearDown(cls): + assert cls.httpd is not None, 'Layer not set up' + cls.httpd.shutdown() + cls.httpd.server_close() + cls._thread.join() + + +class TestSuffixList(TestCase): + layer = HTTPLayer + + def test_cached_copy_is_good(self): + cache_path = os.path.join(config.VAR_DIR, dmarc.LOCAL_FILE_NAME) + with open(cache_path, 'w', encoding='utf-8') as fp: + print('xyz', end='', file=fp) + # The cache expires a day from now. + expires = (now() + timedelta(days=1)).timestamp() + os.utime(cache_path, (expires, expires)) + new_path = dmarc.ensure_current_suffix_list() + self.assertEqual(cache_path, new_path) + with open(cache_path, 'r', encoding='utf-8') as fp: + contents = fp.read() + self.assertEqual(contents, 'xyz') + self.assertEqual(os.stat(new_path).st_mtime, expires) + + @configuration( + 'dmarc', + org_domain_data_url='http://localhost:8180/public_suffix_list.dat') + def test_cached_copy_is_expired(self): + cache_path = os.path.join(config.VAR_DIR, dmarc.LOCAL_FILE_NAME) + with open(cache_path, 'w', encoding='utf-8') as fp: + print('xyz', end='', file=fp) + # Expire the cache file. That way the current cached file will be + # invalid and a new one will be downloaded. + expires = (now() - timedelta(days=1)).timestamp() + os.utime(cache_path, (expires, expires)) + new_path = dmarc.ensure_current_suffix_list() + self.assertEqual(cache_path, new_path) + with open(cache_path, 'r', encoding='utf-8') as fp: + contents = fp.read() + self.assertEqual(contents, 'abc') + self.assertEqual( + os.stat(new_path).st_mtime, + (now() + as_timedelta(config.dmarc.cache_lifetime)).timestamp()) + + @configuration( + 'dmarc', + org_domain_data_url='http://localhost:8180/public_suffix_list.dat') + def test_cached_copy_is_missing(self): + cache_path = os.path.join(config.VAR_DIR, dmarc.LOCAL_FILE_NAME) + self.assertFalse(os.path.exists(cache_path)) + new_path = dmarc.ensure_current_suffix_list() + self.assertEqual(cache_path, new_path) + with open(cache_path, 'r', encoding='utf-8') as fp: + contents = fp.read() + self.assertEqual(contents, 'abc') + self.assertEqual( + os.stat(new_path).st_mtime, + (now() + as_timedelta(config.dmarc.cache_lifetime)).timestamp()) + + @configuration( + 'dmarc', + org_domain_data_url='http://localhost:8180/public_suffix_list.err') + def test_cached_copy_is_missing_download_404s(self): + # There's no cached file and we'll get a 404 with the .err file so + # we'll have to fall back to our internal copy. + cache_path = os.path.join(config.VAR_DIR, dmarc.LOCAL_FILE_NAME) + self.assertFalse(os.path.exists(cache_path)) + new_path = dmarc.ensure_current_suffix_list() + self.assertEqual(cache_path, new_path) + with open(cache_path, 'r', encoding='utf-8') as fp: + contents = fp.read() + # The contents is *not* equal to our dummy test data, but don't tie it + # too closely to the in-tree file contents since that might change + # when and if we update that. + self.assertNotEqual(contents, 'abc') + self.assertEqual( + os.stat(new_path).st_mtime, + (now() + as_timedelta(config.dmarc.cache_lifetime)).timestamp()) + + @configuration( + 'dmarc', + org_domain_data_url='http://localhost:8180/public_suffix_list.err') + def test_cached_copy_is_expired_download_404s(self): + # Because the cached copy is out of date, we try to download the new + # version. But that 404s so we end up continuing to use the cached + # copy. + cache_path = os.path.join(config.VAR_DIR, dmarc.LOCAL_FILE_NAME) + with open(cache_path, 'w', encoding='utf-8') as fp: + print('xyz', end='', file=fp) + # Expire the cache file. That way the current cached file will be + # invalid and a new one will be downloaded. + expires = (now() - timedelta(days=1)).timestamp() + os.utime(cache_path, (expires, expires)) + new_path = dmarc.ensure_current_suffix_list() + self.assertEqual(cache_path, new_path) + with open(cache_path, 'r', encoding='utf-8') as fp: + contents = fp.read() + # The contents are from the cached file. + self.assertEqual(contents, 'xyz') + # The cached file timestamp doesn't change. + self.assertEqual(os.stat(new_path).st_mtime, expires) |
