diff options
Diffstat (limited to 'src/mailman/model')
| -rw-r--r-- | src/mailman/model/mailinglist.py | 74 | ||||
| -rw-r--r-- | src/mailman/model/tests/test_mailinglist.py | 57 |
2 files changed, 127 insertions, 4 deletions
diff --git a/src/mailman/model/mailinglist.py b/src/mailman/model/mailinglist.py index f04c534e1..0a5b20dd8 100644 --- a/src/mailman/model/mailinglist.py +++ b/src/mailman/model/mailinglist.py @@ -37,8 +37,9 @@ from mailman.interfaces.digests import DigestFrequency from mailman.interfaces.domain import IDomainManager from mailman.interfaces.languages import ILanguageManager from mailman.interfaces.mailinglist import ( - IAcceptableAlias, IAcceptableAliasSet, IListArchiver, IListArchiverSet, - IMailingList, Personalization, ReplyToMunging, SubscriptionPolicy) + IAcceptableAlias, IAcceptableAliasSet, IHeaderMatch, IHeaderMatchSet, + IListArchiver, IListArchiverSet, IMailingList, Personalization, + ReplyToMunging, SubscriptionPolicy) from mailman.interfaces.member import ( AlreadySubscribedError, MemberRole, MissingPreferredAddressError, SubscriptionEvent) @@ -57,6 +58,7 @@ from sqlalchemy import ( LargeBinary, PickleType, Unicode) from sqlalchemy.event import listen from sqlalchemy.orm import relationship +from sqlalchemy.orm.exc import NoResultFound from urllib.parse import urljoin from zope.component import getUtility from zope.event import notify @@ -149,7 +151,6 @@ class MailingList(Model): gateway_to_mail = Column(Boolean) gateway_to_news = Column(Boolean) goodbye_message_uri = Column(Unicode) - header_matches = Column(PickleType) header_uri = Column(Unicode) hold_these_nonmembers = Column(PickleType) info = Column(Unicode) @@ -621,3 +622,70 @@ class ListArchiverSet: return store.query(ListArchiver).filter( ListArchiver.mailing_list == self._mailing_list, ListArchiver.name == archiver_name).first() + + + +@implementer(IHeaderMatch) +class HeaderMatch(Model): + """See `IHeaderMatch`.""" + + __tablename__ = 'headermatch' + + id = Column(Integer, primary_key=True) + + mailing_list_id = Column(Integer, ForeignKey('mailinglist.id')) + mailing_list = relationship('MailingList', backref='header_matches') + + header = Column(Unicode) + pattern = Column(Unicode) + chain = Column(Unicode, nullable=True) + + + +@implementer(IHeaderMatchSet) +class HeaderMatchSet: + """See `IHeaderMatchSet`.""" + + def __init__(self, mailing_list): + self._mailing_list = mailing_list + + @dbconnection + def clear(self, store): + """See `IHeaderMatchSet`.""" + store.query(HeaderMatch).filter( + HeaderMatch.mailing_list == self._mailing_list).delete() + + @dbconnection + def add(self, store, header, pattern, chain=None): + header = header.lower() + existing = store.query(HeaderMatch).filter( + HeaderMatch.mailing_list == self._mailing_list, + HeaderMatch.header == header, + HeaderMatch.pattern == pattern).count() + if existing > 0: + raise ValueError('Pattern already exists') + header_match = HeaderMatch( + mailing_list=self._mailing_list, + header=header, pattern=pattern, chain=chain) + store.add(header_match) + + @dbconnection + def remove(self, store, header, pattern): + header = header.lower() + # Don't just filter and use delete(), or the MailingList.header_matches + # collection will not be updated: + # http://docs.sqlalchemy.org/en/rel_1_0/orm/collections.html#dynamic-relationship-loaders + try: + existing = store.query(HeaderMatch).filter( + HeaderMatch.mailing_list == self._mailing_list, + HeaderMatch.header == header, + HeaderMatch.pattern == pattern).one() + except NoResultFound: + raise ValueError('Pattern does not exist') + else: + self._mailing_list.header_matches.remove(existing) + + @dbconnection + def __iter__(self, store): + yield from store.query(HeaderMatch).filter( + HeaderMatch.mailing_list == self._mailing_list) diff --git a/src/mailman/model/tests/test_mailinglist.py b/src/mailman/model/tests/test_mailinglist.py index 745096b4b..8d35a50f6 100644 --- a/src/mailman/model/tests/test_mailinglist.py +++ b/src/mailman/model/tests/test_mailinglist.py @@ -32,7 +32,7 @@ from mailman.config import config from mailman.database.transaction import transaction from mailman.interfaces.listmanager import IListManager from mailman.interfaces.mailinglist import ( - IAcceptableAliasSet, IListArchiverSet) + IAcceptableAliasSet, IHeaderMatchSet, IListArchiverSet) from mailman.interfaces.member import ( AlreadySubscribedError, MemberRole, MissingPreferredAddressError) from mailman.interfaces.usermanager import IUserManager @@ -163,3 +163,58 @@ class TestAcceptableAliases(unittest.TestCase): self.assertEqual(['bee@example.com'], list(alias_set.aliases)) getUtility(IListManager).delete(self._mlist) self.assertEqual(len(list(alias_set.aliases)), 0) + + + +class TestHeaderMatch(unittest.TestCase): + layer = ConfigLayer + + def setUp(self): + self._mlist = create_list('ant@example.com') + + def test_lowercase_header(self): + header_matches = IHeaderMatchSet(self._mlist) + header_matches.add('Header', 'pattern') + self.assertEqual(len(self._mlist.header_matches), 1) + self.assertEqual(self._mlist.header_matches[0].header, 'header') + + def test_chain_defaults_to_none(self): + header_matches = IHeaderMatchSet(self._mlist) + header_matches.add('header', 'pattern') + self.assertEqual(len(self._mlist.header_matches), 1) + self.assertEqual(self._mlist.header_matches[0].chain, None) + + def test_duplicate(self): + header_matches = IHeaderMatchSet(self._mlist) + header_matches.add('Header', 'pattern') + self.assertRaises( + ValueError, header_matches.add, 'Header', 'pattern') + self.assertEqual(len(self._mlist.header_matches), 1) + + def test_remove_non_existent(self): + header_matches = IHeaderMatchSet(self._mlist) + self.assertRaises( + ValueError, header_matches.remove, 'header', 'pattern') + + def test_add_remove(self): + header_matches = IHeaderMatchSet(self._mlist) + header_matches.add('header', 'pattern') + self.assertEqual(len(self._mlist.header_matches), 1) + header_matches.remove('header', 'pattern') + self.assertEqual(len(self._mlist.header_matches), 0) + + def test_iterator(self): + header_matches = IHeaderMatchSet(self._mlist) + header_matches.add('Header', 'pattern') + header_matches.add('Subject', 'patt.*') + header_matches.add('From', '.*@example.com', 'discard') + header_matches.add('From', '.*@example.org', 'accept') + matches = sorted((match.header, match.pattern, match.chain) + for match in IHeaderMatchSet(self._mlist)) + self.assertEqual( + matches, + [('from', '.*@example.com', 'discard'), + ('from', '.*@example.org', 'accept'), + ('header', 'pattern', None), + ('subject', 'patt.*', None), + ]) |
