summaryrefslogtreecommitdiff
path: root/src/mailman/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman/model')
-rw-r--r--src/mailman/model/mailinglist.py74
-rw-r--r--src/mailman/model/tests/test_mailinglist.py57
2 files changed, 127 insertions, 4 deletions
diff --git a/src/mailman/model/mailinglist.py b/src/mailman/model/mailinglist.py
index f04c534e1..0a5b20dd8 100644
--- a/src/mailman/model/mailinglist.py
+++ b/src/mailman/model/mailinglist.py
@@ -37,8 +37,9 @@ from mailman.interfaces.digests import DigestFrequency
from mailman.interfaces.domain import IDomainManager
from mailman.interfaces.languages import ILanguageManager
from mailman.interfaces.mailinglist import (
- IAcceptableAlias, IAcceptableAliasSet, IListArchiver, IListArchiverSet,
- IMailingList, Personalization, ReplyToMunging, SubscriptionPolicy)
+ IAcceptableAlias, IAcceptableAliasSet, IHeaderMatch, IHeaderMatchSet,
+ IListArchiver, IListArchiverSet, IMailingList, Personalization,
+ ReplyToMunging, SubscriptionPolicy)
from mailman.interfaces.member import (
AlreadySubscribedError, MemberRole, MissingPreferredAddressError,
SubscriptionEvent)
@@ -57,6 +58,7 @@ from sqlalchemy import (
LargeBinary, PickleType, Unicode)
from sqlalchemy.event import listen
from sqlalchemy.orm import relationship
+from sqlalchemy.orm.exc import NoResultFound
from urllib.parse import urljoin
from zope.component import getUtility
from zope.event import notify
@@ -149,7 +151,6 @@ class MailingList(Model):
gateway_to_mail = Column(Boolean)
gateway_to_news = Column(Boolean)
goodbye_message_uri = Column(Unicode)
- header_matches = Column(PickleType)
header_uri = Column(Unicode)
hold_these_nonmembers = Column(PickleType)
info = Column(Unicode)
@@ -621,3 +622,70 @@ class ListArchiverSet:
return store.query(ListArchiver).filter(
ListArchiver.mailing_list == self._mailing_list,
ListArchiver.name == archiver_name).first()
+
+
+
+@implementer(IHeaderMatch)
+class HeaderMatch(Model):
+ """See `IHeaderMatch`."""
+
+ __tablename__ = 'headermatch'
+
+ id = Column(Integer, primary_key=True)
+
+ mailing_list_id = Column(Integer, ForeignKey('mailinglist.id'))
+ mailing_list = relationship('MailingList', backref='header_matches')
+
+ header = Column(Unicode)
+ pattern = Column(Unicode)
+ chain = Column(Unicode, nullable=True)
+
+
+
+@implementer(IHeaderMatchSet)
+class HeaderMatchSet:
+ """See `IHeaderMatchSet`."""
+
+ def __init__(self, mailing_list):
+ self._mailing_list = mailing_list
+
+ @dbconnection
+ def clear(self, store):
+ """See `IHeaderMatchSet`."""
+ store.query(HeaderMatch).filter(
+ HeaderMatch.mailing_list == self._mailing_list).delete()
+
+ @dbconnection
+ def add(self, store, header, pattern, chain=None):
+ header = header.lower()
+ existing = store.query(HeaderMatch).filter(
+ HeaderMatch.mailing_list == self._mailing_list,
+ HeaderMatch.header == header,
+ HeaderMatch.pattern == pattern).count()
+ if existing > 0:
+ raise ValueError('Pattern already exists')
+ header_match = HeaderMatch(
+ mailing_list=self._mailing_list,
+ header=header, pattern=pattern, chain=chain)
+ store.add(header_match)
+
+ @dbconnection
+ def remove(self, store, header, pattern):
+ header = header.lower()
+ # Don't just filter and use delete(), or the MailingList.header_matches
+ # collection will not be updated:
+ # http://docs.sqlalchemy.org/en/rel_1_0/orm/collections.html#dynamic-relationship-loaders
+ try:
+ existing = store.query(HeaderMatch).filter(
+ HeaderMatch.mailing_list == self._mailing_list,
+ HeaderMatch.header == header,
+ HeaderMatch.pattern == pattern).one()
+ except NoResultFound:
+ raise ValueError('Pattern does not exist')
+ else:
+ self._mailing_list.header_matches.remove(existing)
+
+ @dbconnection
+ def __iter__(self, store):
+ yield from store.query(HeaderMatch).filter(
+ HeaderMatch.mailing_list == self._mailing_list)
diff --git a/src/mailman/model/tests/test_mailinglist.py b/src/mailman/model/tests/test_mailinglist.py
index 745096b4b..8d35a50f6 100644
--- a/src/mailman/model/tests/test_mailinglist.py
+++ b/src/mailman/model/tests/test_mailinglist.py
@@ -32,7 +32,7 @@ from mailman.config import config
from mailman.database.transaction import transaction
from mailman.interfaces.listmanager import IListManager
from mailman.interfaces.mailinglist import (
- IAcceptableAliasSet, IListArchiverSet)
+ IAcceptableAliasSet, IHeaderMatchSet, IListArchiverSet)
from mailman.interfaces.member import (
AlreadySubscribedError, MemberRole, MissingPreferredAddressError)
from mailman.interfaces.usermanager import IUserManager
@@ -163,3 +163,58 @@ class TestAcceptableAliases(unittest.TestCase):
self.assertEqual(['bee@example.com'], list(alias_set.aliases))
getUtility(IListManager).delete(self._mlist)
self.assertEqual(len(list(alias_set.aliases)), 0)
+
+
+
+class TestHeaderMatch(unittest.TestCase):
+ layer = ConfigLayer
+
+ def setUp(self):
+ self._mlist = create_list('ant@example.com')
+
+ def test_lowercase_header(self):
+ header_matches = IHeaderMatchSet(self._mlist)
+ header_matches.add('Header', 'pattern')
+ self.assertEqual(len(self._mlist.header_matches), 1)
+ self.assertEqual(self._mlist.header_matches[0].header, 'header')
+
+ def test_chain_defaults_to_none(self):
+ header_matches = IHeaderMatchSet(self._mlist)
+ header_matches.add('header', 'pattern')
+ self.assertEqual(len(self._mlist.header_matches), 1)
+ self.assertEqual(self._mlist.header_matches[0].chain, None)
+
+ def test_duplicate(self):
+ header_matches = IHeaderMatchSet(self._mlist)
+ header_matches.add('Header', 'pattern')
+ self.assertRaises(
+ ValueError, header_matches.add, 'Header', 'pattern')
+ self.assertEqual(len(self._mlist.header_matches), 1)
+
+ def test_remove_non_existent(self):
+ header_matches = IHeaderMatchSet(self._mlist)
+ self.assertRaises(
+ ValueError, header_matches.remove, 'header', 'pattern')
+
+ def test_add_remove(self):
+ header_matches = IHeaderMatchSet(self._mlist)
+ header_matches.add('header', 'pattern')
+ self.assertEqual(len(self._mlist.header_matches), 1)
+ header_matches.remove('header', 'pattern')
+ self.assertEqual(len(self._mlist.header_matches), 0)
+
+ def test_iterator(self):
+ header_matches = IHeaderMatchSet(self._mlist)
+ header_matches.add('Header', 'pattern')
+ header_matches.add('Subject', 'patt.*')
+ header_matches.add('From', '.*@example.com', 'discard')
+ header_matches.add('From', '.*@example.org', 'accept')
+ matches = sorted((match.header, match.pattern, match.chain)
+ for match in IHeaderMatchSet(self._mlist))
+ self.assertEqual(
+ matches,
+ [('from', '.*@example.com', 'discard'),
+ ('from', '.*@example.org', 'accept'),
+ ('header', 'pattern', None),
+ ('subject', 'patt.*', None),
+ ])