diff options
Diffstat (limited to 'src/mailman/rules/tests/test_dmarc.py')
| -rw-r--r-- | src/mailman/rules/tests/test_dmarc.py | 184 |
1 files changed, 172 insertions, 12 deletions
diff --git a/src/mailman/rules/tests/test_dmarc.py b/src/mailman/rules/tests/test_dmarc.py index 7748343eb..d2ce28806 100644 --- a/src/mailman/rules/tests/test_dmarc.py +++ b/src/mailman/rules/tests/test_dmarc.py @@ -23,7 +23,7 @@ import threading from contextlib import ExitStack from datetime import timedelta from dns.exception import DNSException -from dns.rdatatype import TXT +from dns.rdatatype import CNAME, TXT from dns.resolver import NXDOMAIN, NoAnswer from http.server import BaseHTTPRequestHandler, HTTPServer from lazr.config import as_timedelta @@ -43,7 +43,12 @@ from unittest.mock import patch @public -def get_dns_resolver(): +def get_dns_resolver( + rtype=TXT, + rdata=b'v=DMARC1; p=reject;', + rmult=False, + cmult=False, + cloop=False): """Create a dns.resolver.Resolver mock. This is used to return a predictable response to a _dmarc query. It @@ -54,28 +59,64 @@ def get_dns_resolver(): """ class Name: # mock answer.name - def __init__(self): - pass + def __init__(self, name='_dmarc.example.biz.'): + self.name = name def to_text(self): - return '_dmarc.example.biz.' + return self.name class Item: # mock answer.items - def __init__(self): - self.strings = [b'v=DMARC1; p=reject;'] + def __init__(self, d=rdata, n='_dmarc.example.com.'): + self.strings = [d] + # for CNAMES + self.target = Name(n) class Ans_e: # mock answer element - def __init__(self): - self.rdtype = TXT - self.items = [Item()] - self.name = Name() + def __init__( + self, + typ=rtype, + d=rdata, + t='_dmarc.example.com.', + n='_dmarc.example.biz.'): + self.rdtype = typ + self.items = [Item(d, t)] + self.name = Name(n) class Answer: # mock answer def __init__(self): - self.answer = [Ans_e()] + if cloop: + self.answer = [ + Ans_e( + typ=CNAME, + n='_dmarc.example.biz.', + t='_dmarc.example.org.' + ), + Ans_e( + typ=CNAME, + n='_dmarc.example.org.', + t='_dmarc.example.biz.' + ), + ] + elif cmult: + self.answer = [ + Ans_e( + typ=CNAME, + n='_dmarc.example.biz.', + t='_dmarc.example.net.' + ), + Ans_e( + typ=CNAME, + n='_dmarc.example.net.', + t='_dmarc.example.com.' + ), + ] + elif rmult: + self.answer = [Ans_e(), Ans_e(d=b'v=DMARC1; p=none;')] + else: + self.answer = [Ans_e()] class Resolver: # mock dns.resolver.Resolver class. @@ -173,6 +214,125 @@ To: ant@example.com 'anne@example.info (_dmarc.example.info). ' 'Abstract base class shared by all dnspython exceptions.\n') + def test_cname_return(self): + mlist = create_list('ant@example.com') + # Use action reject. The rule only hits on reject and discard. + mlist.dmarc_mitigate_action = DMARCMitigateAction.reject + msg = mfs("""\ +From: anne@example.biz +To: ant@example.com + +""") + mark = LogFileMark('mailman.error') + rule = dmarc.DMARCMitigation() + with get_dns_resolver(rtype=CNAME): + self.assertFalse(rule.check(mlist, msg, {})) + line = mark.readline() + self.assertEqual(line, '') + + def test_domain_with_subdomain_policy(self): + mlist = create_list('ant@example.com') + # Use action reject. The rule only hits on reject and discard. + mlist.dmarc_mitigate_action = DMARCMitigateAction.reject + msg = mfs("""\ +From: anne@example.biz +To: ant@example.com + +""") + rule = dmarc.DMARCMitigation() + with get_dns_resolver(rdata=b'v=DMARC1; sp=quarantine;'): + self.assertFalse(rule.check(mlist, msg, {})) + + def test_org_domain_with_subdomain_policy(self): + mlist = create_list('ant@example.com') + # Use action reject. The rule only hits on reject and discard. + mlist.dmarc_mitigate_action = DMARCMitigateAction.reject + msg = mfs("""\ +From: anne@sub.domain.example.biz +To: ant@example.com + +""") + rule = dmarc.DMARCMitigation() + with get_dns_resolver(rdata=b'v=DMARC1; sp=quarantine;'): + self.assertTrue(rule.check(mlist, msg, {})) + + def test_wrong_dmarc_version(self): + mlist = create_list('ant@example.com') + # Use action reject. The rule only hits on reject and discard. + mlist.dmarc_mitigate_action = DMARCMitigateAction.reject + msg = mfs("""\ +From: anne@example.biz +To: ant@example.com + +""") + rule = dmarc.DMARCMitigation() + with get_dns_resolver(rdata=b'v=DMARC01; p=reject;'): + self.assertFalse(rule.check(mlist, msg, {})) + + def test_multiple_records(self): + mlist = create_list('ant@example.com') + # Use action reject. The rule only hits on reject and discard. + mlist.dmarc_mitigate_action = DMARCMitigateAction.reject + msg = mfs("""\ +From: anne@example.biz +To: ant@example.com + +""") + mark = LogFileMark('mailman.error') + rule = dmarc.DMARCMitigation() + with get_dns_resolver(rmult=True): + self.assertTrue(rule.check(mlist, msg, {})) + line = mark.readline() + self.assertEqual( + line[-68:], + 'RRset of TXT records for _dmarc.example.biz has 2 v=DMARC1 ' + 'entries;\n') + + def test_multiple_cnames(self): + mlist = create_list('ant@example.com') + # Use action reject. The rule only hits on reject and discard. + mlist.dmarc_mitigate_action = DMARCMitigateAction.reject + msg = mfs("""\ +From: anne@example.biz +To: ant@example.com + +""") + mark = LogFileMark('mailman.error') + rule = dmarc.DMARCMitigation() + with get_dns_resolver(cmult=True): + self.assertFalse(rule.check(mlist, msg, {})) + line = mark.readline() + self.assertEqual(line, '') + + def test_looping_cnames(self): + mlist = create_list('ant@example.com') + # Use action reject. The rule only hits on reject and discard. + mlist.dmarc_mitigate_action = DMARCMitigateAction.reject + msg = mfs("""\ +From: anne@example.biz +To: ant@example.com + +""") + mark = LogFileMark('mailman.error') + rule = dmarc.DMARCMitigation() + with get_dns_resolver(cloop=True): + self.assertFalse(rule.check(mlist, msg, {})) + line = mark.readline() + self.assertEqual(line, '') + + def test_no_policy(self): + mlist = create_list('ant@example.com') + # Use action reject. The rule only hits on reject and discard. + mlist.dmarc_mitigate_action = DMARCMitigateAction.reject + msg = mfs("""\ +From: anne@example.biz +To: ant@example.com + +""") + rule = dmarc.DMARCMitigation() + with get_dns_resolver(rdata=b'v=DMARC1; pct=100;'): + self.assertFalse(rule.check(mlist, msg, {})) + def test_parser(self): data_file = resource_filename( 'mailman.rules.tests.data', 'org_domain.txt') |
