summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/.cvsignore1
-rw-r--r--tests/Makefile.in75
-rw-r--r--tests/TestBase.py49
-rw-r--r--tests/test_bounces.py17
-rw-r--r--tests/test_handlers.py1648
-rw-r--r--tests/test_membership.py331
-rw-r--r--tests/testall.py2
7 files changed, 2114 insertions, 9 deletions
diff --git a/tests/.cvsignore b/tests/.cvsignore
new file mode 100644
index 000000000..f3c7a7c5d
--- /dev/null
+++ b/tests/.cvsignore
@@ -0,0 +1 @@
+Makefile
diff --git a/tests/Makefile.in b/tests/Makefile.in
new file mode 100644
index 000000000..92e8b252c
--- /dev/null
+++ b/tests/Makefile.in
@@ -0,0 +1,75 @@
+# Copyright (C) 2001 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+# NOTE: Makefile.in is converted into Makefile by the configure script
+# in the parent directory. Once configure has run, you can recreate
+# the Makefile by running just config.status.
+
+# Variables set by configure
+
+VPATH= @srcdir@
+srcdir= @srcdir@
+bindir= @bindir@
+prefix= @prefix@
+exec_prefix= @exec_prefix@
+
+CC= @CC@
+CHMOD= @CHMOD@
+INSTALL= @INSTALL@
+
+DEFS= @DEFS@
+
+# Customizable but not set by configure
+
+OPT= @OPT@
+CFLAGS= $(OPT) $(DEFS)
+TESTDIR= $(prefix)/tests
+SHELL= /bin/sh
+
+TEST_MODULES= *.py
+
+
+# Modes for directories and executables created by the install
+# process. Default to group-writable directories but
+# user-only-writable for executables.
+DIRMODE= 775
+EXEMODE= 755
+FILEMODE= 644
+INSTALL_PROGRAM=$(INSTALL) -m $(EXEMODE)
+
+# Directories make should decend into
+SUBDIRS= bounces msgs
+
+# Rules
+all:
+
+install:
+ for f in $(TEST_MODULES); \
+ do \
+ $(INSTALL) -m $(FILEMODE) $$f $(TESTDIR); \
+ done
+ for d in $(SUBDIRS); \
+ do \
+ (cd $$d; $(MAKE) install); \
+ done
+
+finish:
+
+clean:
+
+distclean:
+ -rm *.pyc
+ -rm Makefile
diff --git a/tests/TestBase.py b/tests/TestBase.py
new file mode 100644
index 000000000..2a3950b52
--- /dev/null
+++ b/tests/TestBase.py
@@ -0,0 +1,49 @@
+# Copyright (C) 2001 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""Test base class which handles creating and deleting a test list.
+"""
+
+import os
+import unittest
+
+from Mailman import MailList
+from Mailman import Utils
+from Mailman import mm_cfg
+
+
+
+class TestBase(unittest.TestCase):
+ def setUp(self):
+ mlist = MailList.MailList()
+ mlist.Create('_xtest', 'test@dom.ain', 'xxxxx')
+ # This leaves the list in a locked state
+ self._mlist = mlist
+
+ def tearDown(self):
+ self._mlist.Unlock()
+ listname = self._mlist.internal_name()
+ for dirtmpl in ['lists/%s',
+ 'archives/private/%s',
+ 'archives/private/%s.mbox',
+ 'archives/public/%s',
+ 'archives/public/%s.mbox',
+ ]:
+ dir = os.path.join(mm_cfg.VAR_PREFIX, dirtmpl % listname)
+ if os.path.islink(dir):
+ os.unlink(dir)
+ elif os.path.isdir(dir):
+ Utils.rmdirhier(dir)
diff --git a/tests/test_bounces.py b/tests/test_bounces.py
index 51fc391cd..00e2d6c44 100644
--- a/tests/test_bounces.py
+++ b/tests/test_bounces.py
@@ -19,8 +19,7 @@
import sys
import os
import unittest
-
-from mimelib.Parser import Parser
+import email
@@ -64,26 +63,29 @@ class BounceTest(unittest.TestCase):
# Done
)
- def checkBounce(self):
+ def test_bounce(self):
for modname, file, addrs in self.DATA:
module = 'Mailman.Bouncers.' + modname
__import__(module)
fp = open(os.path.join('tests', 'bounces', file))
try:
- msg = Parser().parse(fp)
+ msg = email.message_from_file(fp)
finally:
fp.close()
foundaddrs = sys.modules[module].process(msg)
+ # Some modules return None instead of [] for failure
+ if foundaddrs is None:
+ foundaddrs = []
addrs.sort()
foundaddrs.sort()
self.assertEqual(addrs, foundaddrs)
- def checkSMTP32Failure(self):
+ def test_SMTP32_failure(self):
from Mailman.Bouncers import SMTP32
# This file has no X-Mailer: header
fp = open(os.path.join('tests', 'bounces', 'postfix_01.txt'))
try:
- msg = Parser().parse(fp)
+ msg = email.message_from_file(fp)
finally:
fp.close()
self.failIf(msg['x-mailer'] is not None)
@@ -93,11 +95,10 @@ class BounceTest(unittest.TestCase):
def suite():
suite = unittest.TestSuite()
- suite.addTest(unittest.makeSuite(BounceTest, 'check'))
+ suite.addTest(unittest.makeSuite(BounceTest))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='suite')
-
diff --git a/tests/test_handlers.py b/tests/test_handlers.py
new file mode 100644
index 000000000..894222611
--- /dev/null
+++ b/tests/test_handlers.py
@@ -0,0 +1,1648 @@
+# Copyright (C) 2001 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""Unit tests for the various Mailman/Handlers/*.py modules.
+"""
+
+import os
+import time
+import sha
+import unittest
+import cPickle
+import errno
+import email
+from types import ListType
+
+from Mailman import mm_cfg
+from Mailman.MailList import MailList
+from Mailman import Message
+from Mailman import Errors
+from Mailman import Pending
+from Mailman.Queue.Switchboard import Switchboard
+
+from Mailman.Handlers import Acknowledge
+from Mailman.Handlers import AfterDelivery
+from Mailman.Handlers import Approve
+from Mailman.Handlers import CalcRecips
+from Mailman.Handlers import Cleanse
+from Mailman.Handlers import CookHeaders
+from Mailman.Handlers import Decorate
+from Mailman.Handlers import FileRecips
+from Mailman.Handlers import Hold
+from Mailman.Handlers import Personalize
+from Mailman.Handlers import Replybot
+from Mailman.Handlers import SMTPDirect
+from Mailman.Handlers import Sendmail
+from Mailman.Handlers import SpamDetect
+from Mailman.Handlers import Tagger
+from Mailman.Handlers import ToArchive
+from Mailman.Handlers import ToDigest
+from Mailman.Handlers import ToOutgoing
+from Mailman.Handlers import ToUsenet
+
+from TestBase import TestBase
+
+
+
+def password(plaintext):
+ return sha.new(plaintext).hexdigest()
+
+
+
+class TestAcknowledge(TestBase):
+ def setUp(self):
+ TestBase.setUp(self)
+ # We're going to want to inspect this queue directory
+ self._sb = Switchboard(mm_cfg.VIRGINQUEUE_DIR)
+ # Add a member
+ self._mlist.addNewMember('aperson@dom.ain')
+
+ def tearDown(self):
+ for f in os.listdir(mm_cfg.VIRGINQUEUE_DIR):
+ os.unlink(os.path.join(mm_cfg.VIRGINQUEUE_DIR, f))
+ TestBase.tearDown(self)
+
+ def test_no_ack_msgdata(self):
+ eq = self.assertEqual
+ # Make sure there are no files in the virgin queue already
+ eq(len(self._sb.files()), 0)
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+""", Message.Message)
+ Acknowledge.process(self._mlist, msg,
+ {'original_sender': 'aperson@dom.ain'})
+ eq(len(self._sb.files()), 0)
+
+ def test_no_ack_not_a_member(self):
+ eq = self.assertEqual
+ # Make sure there are no files in the virgin queue already
+ eq(len(self._sb.files()), 0)
+ msg = email.message_from_string("""\
+From: bperson@dom.ain
+
+""", Message.Message)
+ Acknowledge.process(self._mlist, msg,
+ {'original_sender': 'bperson@dom.ain'})
+ eq(len(self._sb.files()), 0)
+
+ def test_no_ack_sender(self):
+ eq = self.assertEqual
+ eq(len(self._sb.files()), 0)
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+""", Message.Message)
+ Acknowledge.process(self._mlist, msg, {})
+ eq(len(self._sb.files()), 0)
+
+ def test_ack_no_subject(self):
+ eq = self.assertEqual
+ self._mlist.host_name = 'dom.ain'
+ self._mlist.web_page_url = 'http://www.dom.ain/mailman/'
+ self._mlist.setMemberOption(
+ 'aperson@dom.ain', mm_cfg.AcknowledgePosts, 1)
+ eq(len(self._sb.files()), 0)
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+""", Message.Message)
+ Acknowledge.process(self._mlist, msg, {})
+ files = self._sb.files()
+ eq(len(files), 1)
+ qmsg, qdata = self._sb.dequeue(files[0])
+ # Check the .db file
+ eq(qdata.get('listname'), '_xtest')
+ eq(qdata.get('recips'), ['aperson@dom.ain'])
+ eq(qdata.get('version'), 3)
+ # Check the .pck
+ eq(qmsg['subject'], '_xtest post acknowledgement')
+ eq(qmsg['to'], 'aperson@dom.ain')
+ eq(qmsg['from'], '_xtest-admin@dom.ain')
+ eq(qmsg.get_type(), 'text/plain')
+ eq(qmsg.get_param('charset'), 'us-ascii')
+ msgid = qmsg['message-id']
+ self.failUnless(msgid.startswith('<mailman.'))
+ self.failUnless(msgid.endswith('._xtest@dom.ain>'))
+ eq(qmsg.get_payload(), """\
+Your message entitled
+
+ (no subject)
+
+was successfully received by the _xtest mailing list.
+
+List info page: http://www.dom.ain/mailman/listinfo/_xtest
+""")
+ # Make sure we dequeued the only message
+ eq(len(self._sb.files()), 0)
+
+ def test_ack_with_subject(self):
+ eq = self.assertEqual
+ self._mlist.host_name = 'dom.ain'
+ self._mlist.web_page_url = 'http://www.dom.ain/mailman/'
+ self._mlist.setMemberOption(
+ 'aperson@dom.ain', mm_cfg.AcknowledgePosts, 1)
+ eq(len(self._sb.files()), 0)
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Subject: Wish you were here
+
+""", Message.Message)
+ Acknowledge.process(self._mlist, msg, {})
+ files = self._sb.files()
+ eq(len(files), 1)
+ qmsg, qdata = self._sb.dequeue(files[0])
+ # Check the .db file
+ eq(qdata.get('listname'), '_xtest')
+ eq(qdata.get('recips'), ['aperson@dom.ain'])
+ eq(qdata.get('version'), 3)
+ # Check the .pck
+ eq(qmsg['subject'], '_xtest post acknowledgement')
+ eq(qmsg['to'], 'aperson@dom.ain')
+ eq(qmsg['from'], '_xtest-admin@dom.ain')
+ eq(qmsg.get_type(), 'text/plain')
+ eq(qmsg.get_param('charset'), 'us-ascii')
+ msgid = qmsg['message-id']
+ self.failUnless(msgid.startswith('<mailman.'))
+ self.failUnless(msgid.endswith('._xtest@dom.ain>'))
+ eq(qmsg.get_payload(), """\
+Your message entitled
+
+ Wish you were here
+
+was successfully received by the _xtest mailing list.
+
+List info page: http://www.dom.ain/mailman/listinfo/_xtest
+""")
+ # Make sure we dequeued the only message
+ eq(len(self._sb.files()), 0)
+
+ def test_ack_with_prefixed_subject(self):
+ eq = self.assertEqual
+ self._mlist.host_name = 'dom.ain'
+ self._mlist.web_page_url = 'http://www.dom.ain/mailman/'
+ self._mlist.subject_prefix = '[XTEST] '
+ self._mlist.setMemberOption(
+ 'aperson@dom.ain', mm_cfg.AcknowledgePosts, 1)
+ eq(len(self._sb.files()), 0)
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Subject: [XTEST] Wish you were here
+
+""", Message.Message)
+ Acknowledge.process(self._mlist, msg, {})
+ files = self._sb.files()
+ eq(len(files), 1)
+ qmsg, qdata = self._sb.dequeue(files[0])
+ # Check the .db file
+ eq(qdata.get('listname'), '_xtest')
+ eq(qdata.get('recips'), ['aperson@dom.ain'])
+ eq(qdata.get('version'), 3)
+ # Check the .pck
+ eq(qmsg['subject'], '_xtest post acknowledgement')
+ eq(qmsg['to'], 'aperson@dom.ain')
+ eq(qmsg['from'], '_xtest-admin@dom.ain')
+ eq(qmsg.get_type(), 'text/plain')
+ eq(qmsg.get_param('charset'), 'us-ascii')
+ msgid = qmsg['message-id']
+ self.failUnless(msgid.startswith('<mailman.'))
+ self.failUnless(msgid.endswith('._xtest@dom.ain>'))
+ eq(qmsg.get_payload(), """\
+Your message entitled
+
+ Wish you were here
+
+was successfully received by the _xtest mailing list.
+
+List info page: http://www.dom.ain/mailman/listinfo/_xtest
+""")
+ # Make sure we dequeued the only message
+ eq(len(self._sb.files()), 0)
+
+
+
+class TestAfterDelivery(TestBase):
+ # Both msg and msgdata are ignored
+ def test_process(self):
+ mlist = self._mlist
+ last_post_time = mlist.last_post_time
+ post_id = mlist.post_id
+ AfterDelivery.process(mlist, None, None)
+ self.failUnless(mlist.last_post_time > last_post_time)
+ self.assertEqual(mlist.post_id, post_id + 1)
+
+
+
+class TestApprove(TestBase):
+ def test_short_circuit(self):
+ msgdata = {'approved': 1}
+ rtn = Approve.process(self._mlist, None, msgdata)
+ # Not really a great test, but there's little else to assert
+ self.assertEqual(rtn, None)
+
+ def test_approved_moderator(self):
+ mlist = self._mlist
+ mlist.mod_password = password('wazoo')
+ msg = email.message_from_string("""\
+Approved: wazoo
+
+""")
+ msgdata = {}
+ Approve.process(mlist, msg, msgdata)
+ self.failUnless(msgdata.has_key('approved'))
+ self.assertEqual(msgdata['approved'], 1)
+
+ def test_approve_moderator(self):
+ mlist = self._mlist
+ mlist.mod_password = password('wazoo')
+ msg = email.message_from_string("""\
+Approve: wazoo
+
+""")
+ msgdata = {}
+ Approve.process(mlist, msg, msgdata)
+ self.failUnless(msgdata.has_key('approved'))
+ self.assertEqual(msgdata['approved'], 1)
+
+ def test_approved_admin(self):
+ mlist = self._mlist
+ mlist.password = password('wazoo')
+ msg = email.message_from_string("""\
+Approved: wazoo
+
+""")
+ msgdata = {}
+ Approve.process(mlist, msg, msgdata)
+ self.failUnless(msgdata.has_key('approved'))
+ self.assertEqual(msgdata['approved'], 1)
+
+ def test_approve_admin(self):
+ mlist = self._mlist
+ mlist.password = password('wazoo')
+ msg = email.message_from_string("""\
+Approve: wazoo
+
+""")
+ msgdata = {}
+ Approve.process(mlist, msg, msgdata)
+ self.failUnless(msgdata.has_key('approved'))
+ self.assertEqual(msgdata['approved'], 1)
+
+ def test_unapproved(self):
+ mlist = self._mlist
+ mlist.password = password('zoowa')
+ msg = email.message_from_string("""\
+Approve: wazoo
+
+""")
+ msgdata = {}
+ Approve.process(mlist, msg, msgdata)
+ self.assertEqual(msgdata.get('approved'), None)
+
+ def test_trip_beentheres(self):
+ mlist = self._mlist
+ mlist.host_name = 'dom.ain'
+ msg = email.message_from_string("""\
+X-BeenThere: %s
+
+""" % mlist.GetListEmail())
+ self.assertRaises(Errors.LoopError, Approve.process, mlist, msg, {})
+
+
+
+class TestCalcRecips(TestBase):
+ def setUp(self):
+ TestBase.setUp(self)
+ # Add a bunch of regular members
+ mlist = self._mlist
+ mlist.addNewMember('aperson@dom.ain')
+ mlist.addNewMember('bperson@dom.ain')
+ mlist.addNewMember('cperson@dom.ain')
+ # And a bunch of digest members
+ mlist.addNewMember('dperson@dom.ain', digest=1)
+ mlist.addNewMember('eperson@dom.ain', digest=1)
+ mlist.addNewMember('fperson@dom.ain', digest=1)
+
+ def test_short_circuit(self):
+ msgdata = {'recips': 1}
+ rtn = CalcRecips.process(self._mlist, None, msgdata)
+ # Not really a great test, but there's little else to assert
+ self.assertEqual(rtn, None)
+
+ def test_simple_path(self):
+ msgdata = {}
+ msg = email.message_from_string("""\
+From: dperson@dom.ain
+
+""", Message.Message)
+ CalcRecips.process(self._mlist, msg, msgdata)
+ self.failUnless(msgdata.has_key('recips'))
+ recips = msgdata['recips']
+ recips.sort()
+ self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain',
+ 'cperson@dom.ain'])
+
+ def test_exclude_sender(self):
+ msgdata = {}
+ msg = email.message_from_string("""\
+From: cperson@dom.ain
+
+""", Message.Message)
+ self._mlist.setMemberOption('cperson@dom.ain',
+ mm_cfg.DontReceiveOwnPosts, 1)
+ CalcRecips.process(self._mlist, msg, msgdata)
+ self.failUnless(msgdata.has_key('recips'))
+ recips = msgdata['recips']
+ recips.sort()
+ self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain'])
+
+ def test_urgent_moderator(self):
+ self._mlist.mod_password = password('xxXXxx')
+ msgdata = {}
+ msg = email.message_from_string("""\
+From: dperson@dom.ain
+Urgent: xxXXxx
+
+""", Message.Message)
+ CalcRecips.process(self._mlist, msg, msgdata)
+ self.failUnless(msgdata.has_key('recips'))
+ recips = msgdata['recips']
+ recips.sort()
+ self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain',
+ 'cperson@dom.ain', 'dperson@dom.ain',
+ 'eperson@dom.ain', 'fperson@dom.ain'])
+
+ def test_urgent_admin(self):
+ self._mlist.mod_password = password('yyYYyy')
+ self._mlist.password = password('xxXXxx')
+ msgdata = {}
+ msg = email.message_from_string("""\
+From: dperson@dom.ain
+Urgent: xxXXxx
+
+""", Message.Message)
+ CalcRecips.process(self._mlist, msg, msgdata)
+ self.failUnless(msgdata.has_key('recips'))
+ recips = msgdata['recips']
+ recips.sort()
+ self.assertEqual(recips, ['aperson@dom.ain', 'bperson@dom.ain',
+ 'cperson@dom.ain', 'dperson@dom.ain',
+ 'eperson@dom.ain', 'fperson@dom.ain'])
+
+ def test_urgent_reject(self):
+ self._mlist.mod_password = password('yyYYyy')
+ self._mlist.password = password('xxXXxx')
+ msgdata = {}
+ msg = email.message_from_string("""\
+From: dperson@dom.ain
+Urgent: zzZZzz
+
+""", Message.Message)
+ self.assertRaises(CalcRecips.RejectUrgentMessage,
+ CalcRecips.process,
+ self._mlist, msg, msgdata)
+
+ # BAW: must test the do_topic_filters() path...
+
+
+
+class TestCleanse(TestBase):
+ def setUp(self):
+ TestBase.setUp(self)
+ self._mlist.host_name = 'dom.ain'
+
+ def test_simple_cleanse(self):
+ eq = self.assertEqual
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Approved: yes
+Urgent: indeed
+Reply-To: bperson@dom.ain
+Sender: asystem@dom.ain
+Return-Receipt-To: another@dom.ain
+Disposition-Notification-To: athird@dom.ain
+X-Confirm-Reading-To: afourth@dom.ain
+X-PMRQC: afifth@dom.ain
+Subject: a message to you
+
+""", Message.Message)
+ Cleanse.process(self._mlist, msg, {})
+ eq(msg['approved'], None)
+ eq(msg['urgent'], None)
+ eq(msg['return-receipt-to'], None)
+ eq(msg['disposition-notification-to'], None)
+ eq(msg['x-confirm-reading-to'], None)
+ eq(msg['x-pmrqc'], None)
+ eq(msg['from'], 'aperson@dom.ain')
+ eq(msg['reply-to'], 'bperson@dom.ain')
+ eq(msg['sender'], 'asystem@dom.ain')
+ eq(msg['subject'], 'a message to you')
+
+ def test_anon_cleanse(self):
+ eq = self.assertEqual
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Approved: yes
+Urgent: indeed
+Reply-To: bperson@dom.ain
+Sender: asystem@dom.ain
+Return-Receipt-To: another@dom.ain
+Disposition-Notification-To: athird@dom.ain
+X-Confirm-Reading-To: afourth@dom.ain
+X-PMRQC: afifth@dom.ain
+Subject: a message to you
+
+""", Message.Message)
+ self._mlist.anonymous_list = 1
+ Cleanse.process(self._mlist, msg, {})
+ eq(msg['approved'], None)
+ eq(msg['urgent'], None)
+ eq(msg['return-receipt-to'], None)
+ eq(msg['disposition-notification-to'], None)
+ eq(msg['x-confirm-reading-to'], None)
+ eq(msg['x-pmrqc'], None)
+ eq(len(msg.get_all('from')), 1)
+ eq(len(msg.get_all('reply-to')), 1)
+ eq(msg['from'], '_xtest-admin@dom.ain')
+ eq(msg['reply-to'], '_xtest@dom.ain')
+ eq(msg['sender'], None)
+ eq(msg['subject'], 'a message to you')
+
+
+
+class TestCookHeaders(TestBase):
+ def test_transform_noack_to_xack(self):
+ eq = self.assertEqual
+ msg = email.message_from_string("""\
+X-Ack: yes
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {'noack': 1})
+ eq(len(msg.get_all('x-ack')), 1)
+ eq(msg['x-ack'], 'no')
+
+ def test_original_sender(self):
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+""", Message.Message)
+ msgdata = {}
+ CookHeaders.process(self._mlist, msg, msgdata)
+ self.assertEqual(msgdata.get('original_sender'), 'aperson@dom.ain')
+
+ def test_no_original_sender(self):
+ msg = email.message_from_string("""\
+Subject: about this message
+
+""", Message.Message)
+ msgdata = {}
+ CookHeaders.process(self._mlist, msg, msgdata)
+ self.assertEqual(msgdata.get('original_sender'), '')
+
+ def test_explicit_sender_and_errors_to(self):
+ eq = self.assertEqual
+ msg = email.message_from_string("""\
+Sender: asystem@dom.ain
+Errors-To: bsystem@dom.ain
+
+""", Message.Message)
+ msgdata = {'errorsto': 'robohelper@dom.ain'}
+ CookHeaders.process(self._mlist, msg, msgdata)
+ eq(msg['sender'], 'robohelper@dom.ain')
+ eq(msg['errors-to'], 'robohelper@dom.ain')
+
+ def test_implicit_sender_and_errors_to(self):
+ eq = self.assertEqual
+ self._mlist.host_name = 'dom.ain'
+ msg = email.message_from_string("""\
+Sender: asystem@dom.ain
+Errors-To: bsystem@dom.ain
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ eq(msg['sender'], '_xtest-admin@dom.ain')
+ eq(msg['errors-to'], '_xtest-admin@dom.ain')
+
+ def test_xbeenthere(self):
+ self._mlist.host_name = 'dom.ain'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ self.assertEqual(msg['x-beenthere'], '_xtest@dom.ain')
+
+ def test_multiple_xbeentheres(self):
+ eq = self.assertEqual
+ self._mlist.host_name = 'dom.ain'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+X-BeenThere: alist@another.dom.ain
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ eq(len(msg.get_all('x-beenthere')), 2)
+ beentheres = msg.get_all('x-beenthere')
+ beentheres.sort()
+ eq(beentheres, ['_xtest@dom.ain', 'alist@another.dom.ain'])
+
+ def test_nonexisting_mmversion(self):
+ eq = self.assertEqual
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ eq(msg['x-mailman-version'], mm_cfg.VERSION)
+
+ def test_existing_mmversion(self):
+ eq = self.assertEqual
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+X-Mailman-Version: 3000
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ eq(len(msg.get_all('x-mailman-version')), 1)
+ eq(msg['x-mailman-version'], '3000')
+
+ def test_nonexisting_precedence(self):
+ eq = self.assertEqual
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ eq(msg['precedence'], 'bulk')
+
+ def test_existing_precedence(self):
+ eq = self.assertEqual
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Precedence: junk
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ eq(len(msg.get_all('precedence')), 1)
+ eq(msg['precedence'], 'junk')
+
+ def test_subject_munging_no_subject(self):
+ self._mlist.subject_prefix = '[XTEST] '
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ self.assertEqual(msg['subject'], '[XTEST] (no subject)')
+
+ def test_subject_munging(self):
+ self._mlist.subject_prefix = '[XTEST] '
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Subject: About Mailman...
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ self.assertEqual(msg['subject'], '[XTEST] About Mailman...')
+
+ def test_no_subject_munging_for_digests(self):
+ self._mlist.subject_prefix = '[XTEST] '
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Subject: About Mailman...
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {'isdigest': 1})
+ self.assertEqual(msg['subject'], 'About Mailman...')
+
+ def test_no_subject_munging_for_fasttrack(self):
+ self._mlist.subject_prefix = '[XTEST] '
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Subject: About Mailman...
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {'_fasttrack': 1})
+ self.assertEqual(msg['subject'], 'About Mailman...')
+
+ def test_no_subject_munging_has_prefix(self):
+ self._mlist.subject_prefix = '[XTEST] '
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Subject: Re: [XTEST] About Mailman...
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ self.assertEqual(msg['subject'], 'Re: [XTEST] About Mailman...')
+
+ def test_reply_to_list(self):
+ eq = self.assertEqual
+ self._mlist.reply_goes_to_list = 1
+ self._mlist.host_name = 'dom.ain'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ eq(msg['reply-to'], '_xtest@dom.ain')
+ eq(msg['x-reply-to'], None)
+
+ def test_reply_to_list_with_xreplyto(self):
+ eq = self.assertEqual
+ self._mlist.reply_goes_to_list = 1
+ self._mlist.host_name = 'dom.ain'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Reply-To: bperson@dom.ain
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ eq(msg['reply-to'], '_xtest@dom.ain')
+ eq(msg['x-reply-to'], 'bperson@dom.ain')
+
+ def test_reply_to_explicit(self):
+ eq = self.assertEqual
+ self._mlist.reply_goes_to_list = 2
+ self._mlist.host_name = 'dom.ain'
+ self._mlist.reply_to_address = 'mlist@dom.ain'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ eq(msg['reply-to'], 'mlist@dom.ain')
+ eq(msg['x-reply-to'], None)
+
+ def test_reply_to_list_with_xreplyto2(self):
+ eq = self.assertEqual
+ self._mlist.reply_goes_to_list = 2
+ self._mlist.host_name = 'dom.ain'
+ self._mlist.reply_to_address = 'mlist@dom.ain'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Reply-To: bperson@dom.ain
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ eq(msg['reply-to'], 'mlist@dom.ain')
+ eq(msg['x-reply-to'], 'bperson@dom.ain')
+
+ def test_reply_to_explicit_with_bad_address(self):
+ eq = self.assertEqual
+ self._mlist.reply_goes_to_list = 2
+ self._mlist.host_name = 'dom.ain'
+ # A bad address that is missing the domain part
+ self._mlist.reply_to_address = 'mlist'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Reply-To: bperson@dom.ain
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ eq(msg['reply-to'], 'bperson@dom.ain')
+ eq(msg['x-reply-to'], None)
+
+ def test_list_headers_nolist(self):
+ eq = self.assertEqual
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {'_nolist': 1})
+ eq(msg['list-id'], None)
+ eq(msg['list-help'], None)
+ eq(msg['list-unsubscribe'], None)
+ eq(msg['list-subscribe'], None)
+ eq(msg['list-post'], None)
+ eq(msg['list-archive'], None)
+
+ def test_list_headers(self):
+ eq = self.assertEqual
+ self._mlist.archive = 1
+ self._mlist.host_name = 'dom.ain'
+ self._mlist.web_page_url = 'http://www.dom.ain/mailman/'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ eq(msg['list-id'], '<_xtest.dom.ain>')
+ eq(msg['list-help'], '<mailto:_xtest-request@dom.ain?subject=help>')
+ eq(msg['list-unsubscribe'],
+ '<http://www.dom.ain/mailman/listinfo/_xtest>,'
+ '\n\t<mailto:_xtest-request@dom.ain?subject=unsubscribe>')
+ eq(msg['list-subscribe'],
+ '<http://www.dom.ain/mailman/listinfo/_xtest>,'
+ '\n\t<mailto:_xtest-request@dom.ain?subject=subscribe>')
+ eq(msg['list-post'], '<mailto:_xtest@dom.ain>')
+ eq(msg['list-archive'], '<http://www.dom.ain/pipermail/_xtest/>')
+
+ def test_list_headers_with_description(self):
+ eq = self.assertEqual
+ self._mlist.archive = 1
+ self._mlist.description = 'A Test List'
+ self._mlist.host_name = 'dom.ain'
+ self._mlist.web_page_url = 'http://www.dom.ain/mailman/'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+""", Message.Message)
+ CookHeaders.process(self._mlist, msg, {})
+ eq(msg['list-id'], 'A Test List <_xtest.dom.ain>')
+ eq(msg['list-help'], '<mailto:_xtest-request@dom.ain?subject=help>')
+ eq(msg['list-unsubscribe'],
+ '<http://www.dom.ain/mailman/listinfo/_xtest>,'
+ '\n\t<mailto:_xtest-request@dom.ain?subject=unsubscribe>')
+ eq(msg['list-subscribe'],
+ '<http://www.dom.ain/mailman/listinfo/_xtest>,'
+ '\n\t<mailto:_xtest-request@dom.ain?subject=subscribe>')
+ eq(msg['list-post'], '<mailto:_xtest@dom.ain>')
+ eq(msg['list-archive'], '<http://www.dom.ain/pipermail/_xtest/>')
+
+
+
+class TestDecorate(TestBase):
+ def test_short_circuit(self):
+ msgdata = {'isdigest': 1}
+ rtn = Decorate.process(self._mlist, None, msgdata)
+ # Not really a great test, but there's little else to assert
+ self.assertEqual(rtn, None)
+
+ def test_no_multipart(self):
+ mlist = self._mlist
+ mlist.msg_header = 'header\n'
+ mlist.msg_footer = 'footer'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+Here is a message.
+""")
+ Decorate.process(self._mlist, msg, {})
+ self.assertEqual(msg.get_payload(), """\
+header
+Here is a message.
+footer""")
+
+ def test_no_multipart_template(self):
+ mlist = self._mlist
+ mlist.msg_header = '%(real_name)s header\n'
+ mlist.msg_footer = '%(real_name)s footer'
+ mlist.real_name = 'XTest'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+Here is a message.
+""")
+ Decorate.process(self._mlist, msg, {})
+ self.assertEqual(msg.get_payload(), """\
+XTest header
+Here is a message.
+XTest footer""")
+
+ def test_no_multipart_type_error(self):
+ mlist = self._mlist
+ mlist.msg_header = '%(real_name) header\n'
+ mlist.msg_footer = '%(real_name) footer'
+ mlist.real_name = 'XTest'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+Here is a message.
+""")
+ Decorate.process(self._mlist, msg, {})
+ self.assertEqual(msg.get_payload(), """\
+%(real_name) header
+Here is a message.
+%(real_name) footer""")
+
+ def test_no_multipart_value_error(self):
+ mlist = self._mlist
+ mlist.msg_header = '%(real_name)p header\n'
+ mlist.msg_footer = '%(real_name)p footer'
+ mlist.real_name = 'XTest'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+Here is a message.
+""")
+ Decorate.process(self._mlist, msg, {})
+ self.assertEqual(msg.get_payload(), """\
+%(real_name)p header
+Here is a message.
+%(real_name)p footer""")
+
+ def test_no_multipart_missing_key(self):
+ mlist = self._mlist
+ mlist.msg_header = '%(spooge)s header\n'
+ mlist.msg_footer = '%(spooge)s footer'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+Here is a message.
+""")
+ Decorate.process(self._mlist, msg, {})
+ self.assertEqual(msg.get_payload(), """\
+%(spooge)s header
+Here is a message.
+%(spooge)s footer""")
+
+ def test_multipart(self):
+ mlist = self._mlist
+ mlist.msg_header = 'header\n'
+ mlist.msg_footer = 'footer'
+ msg1 = email.message_from_string("""\
+From: aperson@dom.ain
+
+Here is the first message.
+""")
+ msg2 = email.message_from_string("""\
+From: bperson@dom.ain
+
+Here is the second message.
+""")
+ msg = Message.Message()
+ msg.add_payload(msg1)
+ msg.add_payload(msg2)
+ msg.add_header('Content-Type', 'multipart/mixed', boundary='BOUNDARY')
+ Decorate.process(self._mlist, msg, {})
+ self.assertEqual(msg.as_string(unixfrom=0), """\
+Content-Type: multipart/mixed; boundary="BOUNDARY"
+
+--BOUNDARY
+Content-Type: text/plain; charset="us-ascii"
+MIME-Version: 1.0
+Content-Transfer-Encoding: 7bit
+
+header
+
+--BOUNDARY
+From: aperson@dom.ain
+
+Here is the first message.
+
+--BOUNDARY
+From: bperson@dom.ain
+
+Here is the second message.
+
+--BOUNDARY
+Content-Type: text/plain; charset="us-ascii"
+MIME-Version: 1.0
+Content-Transfer-Encoding: 7bit
+
+footer
+
+--BOUNDARY--""")
+
+ def test_image(self):
+ mlist = self._mlist
+ mlist.msg_header = 'header\n'
+ mlist.msg_footer = 'footer'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Content-type: image/x-spooge
+
+IMAGEDATAIMAGEDATAIMAGEDATA
+""")
+ Decorate.process(self._mlist, msg, {})
+ self.assertEqual(msg.get_payload(), """\
+IMAGEDATAIMAGEDATAIMAGEDATA
+""")
+
+ def test_personalize_assert(self):
+ raises = self.assertRaises
+ raises(AssertionError, Decorate.process,
+ self._mlist, None, {'personalized': 1})
+ raises(AssertionError, Decorate.process,
+ self._mlist, None, {'personalized': 1,
+ 'recips': [1, 2, 3]})
+
+ def test_personalize_not_a_member(self):
+ mlist = self._mlist
+ mlist.msg_header = """\
+%(user_address)s
+%(user_delivered_to)s
+%(user_language)s
+%(user_name)s
+%(user_optionsurl)s
+"""
+ mlist.msg_footer = mlist.msg_header
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+Here is a message.
+""")
+ Decorate.process(mlist, msg, {'personalized': 1,
+ 'recips': ['aperson@dom.ain']})
+ self.assertEqual(msg.get_payload(), """\
+aperson@dom.ain
+%(user_delivered_to)s
+%(user_language)s
+%(user_name)s
+%(user_optionsurl)s
+Here is a message.
+aperson@dom.ain
+%(user_delivered_to)s
+%(user_language)s
+%(user_name)s
+%(user_optionsurl)s
+""")
+
+ def test_personalize(self):
+ mlist = self._mlist
+ mlist.msg_header = """\
+%(user_address)s
+%(user_delivered_to)s
+%(user_language)s
+%(user_name)s
+%(user_optionsurl)s
+"""
+ mlist.msg_footer = mlist.msg_header
+ mlist.web_page_url = 'http://www.dom.ain/mailman/'
+ mlist.addNewMember('APerson@dom.ain', language='xx',
+ password='xxXXxx', realname='A. Person')
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+Here is a message.
+""")
+ Decorate.process(mlist, msg, {'personalized': 1,
+ 'recips': ['aperson@dom.ain']})
+ self.assertEqual(msg.get_payload(), """\
+aperson@dom.ain
+APerson@dom.ain
+xx
+A. Person
+http://www.dom.ain/mailman/options/_xtest/aperson%40dom.ain
+Here is a message.
+aperson@dom.ain
+APerson@dom.ain
+xx
+A. Person
+http://www.dom.ain/mailman/options/_xtest/aperson%40dom.ain
+""")
+
+ def test_personalize_no_name(self):
+ mlist = self._mlist
+ mlist.msg_header = """\
+%(user_address)s
+%(user_delivered_to)s
+%(user_language)s
+%(user_name)s
+%(user_optionsurl)s
+"""
+ mlist.msg_footer = mlist.msg_header
+ mlist.web_page_url = 'http://www.dom.ain/mailman/'
+ mlist.addNewMember('APerson@dom.ain', language='xx', password='xxXXxx')
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+Here is a message.
+""")
+ Decorate.process(mlist, msg, {'personalized': 1,
+ 'recips': ['aperson@dom.ain']})
+ self.assertEqual(msg.get_payload(), """\
+aperson@dom.ain
+APerson@dom.ain
+xx
+not available
+http://www.dom.ain/mailman/options/_xtest/aperson%40dom.ain
+Here is a message.
+aperson@dom.ain
+APerson@dom.ain
+xx
+not available
+http://www.dom.ain/mailman/options/_xtest/aperson%40dom.ain
+""")
+
+
+
+class TestFileRecips(TestBase):
+ def test_short_circuit(self):
+ msgdata = {'recips': 1}
+ rtn = FileRecips.process(self._mlist, None, msgdata)
+ # Not really a great test, but there's little else to assert
+ self.assertEqual(rtn, None)
+
+ def test_file_nonexistant(self):
+ msgdata = {}
+ FileRecips.process(self._mlist, None, msgdata)
+ self.assertEqual(msgdata.get('recips'), [])
+
+ def test_file_exists_no_sender(self):
+ msg = email.message_from_string("""\
+To: yall@dom.ain
+
+""", Message.Message)
+ msgdata = {}
+ file = os.path.join(self._mlist.fullpath(), 'members.txt')
+ addrs = ['aperson@dom.ain', 'bperson@dom.ain',
+ 'cperson@dom.ain', 'dperson@dom.ain']
+ fp = open(file, 'w')
+ try:
+ for addr in addrs:
+ print >> fp, addr
+ fp.close()
+ FileRecips.process(self._mlist, msg, msgdata)
+ self.assertEqual(msgdata.get('recips'), addrs)
+ finally:
+ try:
+ os.unlink(file)
+ except OSError, e:
+ if e.errno <> e.ENOENT: raise
+
+ def test_file_exists_no_member(self):
+ msg = email.message_from_string("""\
+From: eperson@dom.ain
+To: yall@dom.ain
+
+""", Message.Message)
+ msgdata = {}
+ file = os.path.join(self._mlist.fullpath(), 'members.txt')
+ addrs = ['aperson@dom.ain', 'bperson@dom.ain',
+ 'cperson@dom.ain', 'dperson@dom.ain']
+ fp = open(file, 'w')
+ try:
+ for addr in addrs:
+ print >> fp, addr
+ fp.close()
+ FileRecips.process(self._mlist, msg, msgdata)
+ self.assertEqual(msgdata.get('recips'), addrs)
+ finally:
+ try:
+ os.unlink(file)
+ except OSError, e:
+ if e.errno <> e.ENOENT: raise
+
+ def test_file_exists_is_member(self):
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+To: yall@dom.ain
+
+""", Message.Message)
+ msgdata = {}
+ file = os.path.join(self._mlist.fullpath(), 'members.txt')
+ addrs = ['aperson@dom.ain', 'bperson@dom.ain',
+ 'cperson@dom.ain', 'dperson@dom.ain']
+ fp = open(file, 'w')
+ try:
+ for addr in addrs:
+ print >> fp, addr
+ self._mlist.addNewMember(addr)
+ fp.close()
+ FileRecips.process(self._mlist, msg, msgdata)
+ self.assertEqual(msgdata.get('recips'), addrs[1:])
+ finally:
+ try:
+ os.unlink(file)
+ except OSError, e:
+ if e.errno <> e.ENOENT: raise
+
+
+
+class TestHold(TestBase):
+ def setUp(self):
+ TestBase.setUp(self)
+ self._mlist.administrivia = 1
+ self._mlist.respond_to_post_requests = 0
+ self._mlist.admin_immed_notify = 0
+ # We're going to want to inspect this queue directory
+ self._sb = Switchboard(mm_cfg.VIRGINQUEUE_DIR)
+
+ def tearDown(self):
+ for f in os.listdir(mm_cfg.VIRGINQUEUE_DIR):
+ os.unlink(os.path.join(mm_cfg.VIRGINQUEUE_DIR, f))
+ TestBase.tearDown(self)
+ try:
+ os.unlink(os.path.join(mm_cfg.DATA_DIR, 'pending.db'))
+ except OSError, e:
+ if e.errno <> errno.ENOENT: raise
+ for f in [holdfile for holdfile in os.listdir(mm_cfg.DATA_DIR)
+ if holdfile.startswith('heldmsg-')]:
+ os.unlink(os.path.join(mm_cfg.DATA_DIR, f))
+
+ def test_short_circuit(self):
+ msgdata = {'approved': 1}
+ rtn = Hold.process(self._mlist, None, msgdata)
+ # Not really a great test, but there's little else to assert
+ self.assertEqual(rtn, None)
+
+ def test_administrivia(self):
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Subject: unsubscribe
+
+""", Message.Message)
+ self.assertRaises(Hold.Administrivia, Hold.process,
+ self._mlist, msg, {})
+
+ def test_forbidden_posters(self):
+ self._mlist.forbidden_posters = ['aperson@dom.ain']
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Subject: The forbidden donut
+
+""", Message.Message)
+ self.assertRaises(Hold.ForbiddenPoster, Hold.process,
+ self._mlist, msg, {})
+
+ def test_moderated_no_posters(self):
+ self._mlist.moderated = 1
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Subject: An unapproved message
+
+""", Message.Message)
+ self.assertRaises(Hold.ModeratedPost, Hold.process,
+ self._mlist, msg, {})
+
+ def test_moderated_posters(self):
+ self._mlist.moderated = 1
+ self._mlist.posters = ['aperson@dom.ain']
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+To: _xtest@dom.ain
+Subject: An unapproved message
+
+""", Message.Message)
+ rtn = Hold.process(self._mlist, msg, {})
+ self.assertEqual(rtn, None)
+
+ def test_member_posting_only(self):
+ self._mlist.member_posting_only = 1
+ msg = email.message_from_string("""\
+From: bperson@dom.ain
+To: _xtest@dom.ain
+Subject: An unapproved message
+
+""", Message.Message)
+ self.assertRaises(Hold.NonMemberPost, Hold.process,
+ self._mlist, msg, {})
+
+ def test_member_posting_only_ok_member(self):
+ self._mlist.addNewMember('aperson@dom.ain')
+ self._mlist.member_posting_only = 1
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+To: _xtest@dom.ain
+Subject: An unapproved message
+
+""", Message.Message)
+ rtn = Hold.process(self._mlist, msg, {})
+ self.assertEqual(rtn, None)
+
+ def test_member_posting_only_ok_poster(self):
+ self._mlist.member_posting_only = 1
+ self._mlist.posters = ['aperson@dom.ain']
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+To: _xtest@dom.ain
+Subject: An unapproved message
+
+""", Message.Message)
+ rtn = Hold.process(self._mlist, msg, {})
+ self.assertEqual(rtn, None)
+
+ def test_posters_ok(self):
+ self._mlist.posters = ['aperson@dom.ain']
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+To: _xtest@dom.ain
+Subject: An unapproved message
+
+""", Message.Message)
+ rtn = Hold.process(self._mlist, msg, {})
+ self.assertEqual(rtn, None)
+
+ def test_posters(self):
+ self._mlist.posters = ['bperson@dom.ain']
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+To: _xtest@dom.ain
+Subject: An unapproved message
+
+""", Message.Message)
+ self.assertRaises(Hold.NotExplicitlyAllowed, Hold.process,
+ self._mlist, msg, {})
+
+ def test_max_recips(self):
+ self._mlist.max_num_recipients = 5
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+To: _xtest@dom.ain, bperson@dom.ain
+Cc: cperson@dom.ain
+Cc: dperson@dom.ain (Jimmy D. Person)
+To: Billy E. Person <eperson@dom.ain>
+
+Hey folks!
+""", Message.Message)
+ self.assertRaises(Hold.TooManyRecipients, Hold.process,
+ self._mlist, msg, {})
+
+ def test_implicit_destination(self):
+ self._mlist.require_explicit_destination = 1
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Subject: An implicit message
+
+""", Message.Message)
+ self.assertRaises(Hold.ImplicitDestination, Hold.process,
+ self._mlist, msg, {})
+
+ def test_implicit_destination_fromusenet(self):
+ self._mlist.require_explicit_destination = 1
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+Subject: An implicit message
+
+""", Message.Message)
+ rtn = Hold.process(self._mlist, msg, {'fromusenet': 1})
+ self.assertEqual(rtn, None)
+
+ def test_suspicious_header(self):
+ self._mlist.bounce_matching_headers = 'From: .*person@(blah.)?dom.ain'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+To: _xtest@dom.ain
+Subject: An implicit message
+
+""", Message.Message)
+ self.assertRaises(Hold.SuspiciousHeaders, Hold.process,
+ self._mlist, msg, {})
+
+ def test_suspicious_header_ok(self):
+ self._mlist.bounce_matching_headers = 'From: .*person@blah.dom.ain'
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+To: _xtest@dom.ain
+Subject: An implicit message
+
+""", Message.Message)
+ rtn = Hold.process(self._mlist, msg, {})
+ self.assertEqual(rtn, None)
+
+ def test_max_message_size(self):
+ self._mlist.max_message_size = 1
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+To: _xtest@dom.ain
+
+xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+""", Message.Message)
+ self.assertRaises(Hold.MessageTooBig, Hold.process,
+ self._mlist, msg, {})
+
+ def test_hold_notifications(self):
+ eq = self.assertEqual
+ self._mlist.respond_to_post_requests = 1
+ self._mlist.admin_immed_notify = 1
+ self._mlist.host_name = 'dom.ain'
+ # Now cause an implicit destination hold
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+
+""", Message.Message)
+ self.assertRaises(Hold.ImplicitDestination, Hold.process,
+ self._mlist, msg, {})
+ # Now we have to make sure there are two messages in the virgin queue,
+ # one to the sender and one to the list owners.
+ qfiles = {}
+ files = self._sb.files()
+ eq(len(files), 2)
+ for filebase in files:
+ qmsg, qdata = self._sb.dequeue(filebase)
+ to = qmsg['to']
+ qfiles[to] = qmsg, qdata
+ # BAW: We could be testing many other attributes of either the
+ # messages or the metadata files...
+ keys = qfiles.keys()
+ keys.sort()
+ eq(keys, ['_xtest-owner@dom.ain', 'aperson@dom.ain'])
+ # Get the pending cookie from the message to the sender
+ pmsg, pdata = qfiles['aperson@dom.ain']
+ confirmlines = pmsg.get_payload().split('\n')
+ cookie = confirmlines[-3].split('/')[-1]
+ # We also need to make sure there's an entry in the Pending database
+ # for the heold message.
+ data = Pending.confirm(cookie)
+ eq(data, ('H', 1))
+ heldmsg = os.path.join(mm_cfg.DATA_DIR, 'heldmsg-_xtest-1.pck')
+ self.failUnless(os.path.exists(heldmsg))
+ os.unlink(heldmsg)
+ holdfiles = [f for f in os.listdir(mm_cfg.DATA_DIR)
+ if f.startswith('heldmsg-')]
+ eq(len(holdfiles), 0)
+
+
+
+class TestPersonalize(TestBase):
+ def setUp(self):
+ TestBase.setUp(self)
+ # We're going to want to inspect this queue directory
+ self._sb = Switchboard(mm_cfg.INQUEUE_DIR)
+ # Add a member
+ self._mlist.addNewMember('aperson@dom.ain')
+ self._mlist.addNewMember('bperson@dom.ain', realname='B. Person')
+
+ def tearDown(self):
+ for f in os.listdir(mm_cfg.INQUEUE_DIR):
+ os.unlink(os.path.join(mm_cfg.INQUEUE_DIR, f))
+ TestBase.tearDown(self)
+
+ def test_short_circuit(self):
+ eq = self.assertEqual
+ msgdata = {}
+ rtn = Personalize.process(self._mlist, None, {})
+ # Not really a great test, but there's little else to assert
+ eq(rtn, None)
+ eq(msgdata, {})
+ # Try short circuit via msgdata
+ self._mlist.personalize = 1
+ msgdata['personalized'] = 1
+ rtn = Personalize.process(self._mlist, None, msgdata)
+ # Not really a great test, but there's little else to assert
+ eq(rtn, None)
+ eq(msgdata, {'personalized': 1})
+
+ def test_personalize(self):
+ eq = self.assertEqual
+ msg = email.message_from_string("""\
+To: _xtest@dom.ain
+
+A message for you.
+""")
+ self._mlist.personalize = 1
+ msgdata = {'recips': ['aperson@dom.ain', 'bperson@dom.ain']}
+ Personalize.process(self._mlist, msg, msgdata)
+ eq(msg['to'], '_xtest@dom.ain')
+ eq(msgdata, {})
+ # There should be two files in qfiles/in
+ files = self._sb.files()
+ eq(len(files), 2)
+ filedata = {}
+ for filebase in files:
+ msg, data = self._sb.dequeue(filebase)
+ key = data.get('recips')
+ eq(type(key), ListType)
+ eq(len(key), 1)
+ filedata[key[0]] = msg, data
+ eq(len(filedata), 2)
+ keys = filedata.keys()
+ keys.sort()
+ eq(keys, ['aperson@dom.ain', 'bperson@dom.ain'])
+ for person in keys:
+ msg, data = filedata[person]
+ eq(data['pipeline'], ['Decorate', 'ToOutgoing'])
+ eq(data['recips'], [person])
+ eq(data['personalized'], 1)
+ realname = self._mlist.getMemberName(person)
+ if realname:
+ to = '%s (%s)' % (person, realname)
+ else:
+ to = person
+ eq(msg['to'], to)
+
+
+
+class TestReplybot(TestBase):
+ pass
+
+
+
+class TestSMTPDirect(TestBase):
+ pass
+
+
+
+class TestSendmail(TestBase):
+ pass
+
+
+
+class TestSpamDetect(TestBase):
+ def test_short_circuit(self):
+ msgdata = {'approved': 1}
+ rtn = SpamDetect.process(self._mlist, None, msgdata)
+ # Not really a great test, but there's little else to assert
+ self.assertEqual(rtn, None)
+
+ def test_spam_detect(self):
+ msg1 = email.message_from_string("""\
+From: aperson@dom.ain
+
+A message.
+""")
+ msg2 = email.message_from_string("""\
+To: xlist@dom.ain
+
+A message.
+""")
+ spammers = mm_cfg.KNOWN_SPAMMERS[:]
+ try:
+ mm_cfg.KNOWN_SPAMMERS.append(('from', '.?person'))
+ self.assertRaises(SpamDetect.SpamDetected,
+ SpamDetect.process, self._mlist, msg1, {})
+ rtn = SpamDetect.process(self._mlist, msg2, {})
+ self.assertEqual(rtn, None)
+ finally:
+ mm_cfg.KNOWN_SPAMMERS = spammers
+
+
+
+class TestTagger(TestBase):
+ pass
+
+
+
+class TestToArchive(TestBase):
+ def setUp(self):
+ TestBase.setUp(self)
+ # We're going to want to inspect this queue directory
+ self._sb = Switchboard(mm_cfg.ARCHQUEUE_DIR)
+
+ def tearDown(self):
+ for f in os.listdir(mm_cfg.ARCHQUEUE_DIR):
+ os.unlink(os.path.join(mm_cfg.ARCHQUEUE_DIR, f))
+ TestBase.tearDown(self)
+
+ def test_short_circuit(self):
+ eq = self.assertEqual
+ msgdata = {'isdigest': 1}
+ ToArchive.process(self._mlist, None, msgdata)
+ eq(len(self._sb.files()), 0)
+ # Try the other half of the or...
+ self._mlist.archive = 0
+ ToArchive.process(self._mlist, None, msgdata)
+ eq(len(self._sb.files()), 0)
+ # Now try the various message header shortcuts
+ msg = email.message_from_string("""\
+X-No-Archive: YES
+
+""")
+ self._mlist.archive = 1
+ ToArchive.process(self._mlist, msg, {})
+ eq(len(self._sb.files()), 0)
+ # And for backwards compatibility
+ msg = email.message_from_string("""\
+X-Archive: NO
+
+""")
+ ToArchive.process(self._mlist, msg, {})
+ eq(len(self._sb.files()), 0)
+
+ def test_normal_archiving(self):
+ eq = self.assertEqual
+ msg = email.message_from_string("""\
+Subject: About Mailman
+
+It rocks!
+""")
+ ToArchive.process(self._mlist, msg, {})
+ files = self._sb.files()
+ eq(len(files), 1)
+ msg2, data = self._sb.dequeue(files[0])
+ eq(len(data), 2)
+ eq(data['version'], 3)
+ self.failUnless(data['received_time'] <= time.time())
+ eq(msg.as_string(unixfrom=0), msg2.as_string(unixfrom=0))
+
+
+
+class TestToDigest(TestBase):
+ pass
+
+
+
+class TestToOutgoing(TestBase):
+ def setUp(self):
+ TestBase.setUp(self)
+ # We're going to want to inspect this queue directory
+ self._sb = Switchboard(mm_cfg.OUTQUEUE_DIR)
+
+ def tearDown(self):
+ for f in os.listdir(mm_cfg.OUTQUEUE_DIR):
+ os.unlink(os.path.join(mm_cfg.OUTQUEUE_DIR, f))
+ TestBase.tearDown(self)
+
+ def test_outgoing(self):
+ eq = self.assertEqual
+ msg = email.message_from_string("""\
+Subject: About Mailman
+
+It rocks!
+""")
+ msgdata = {'foo': 1, 'bar': 2}
+ ToOutgoing.process(self._mlist, msg, msgdata)
+ files = self._sb.files()
+ eq(len(files), 1)
+ msg2, data = self._sb.dequeue(files[0])
+ eq(msg.as_string(unixfrom=0), msg2.as_string(unixfrom=0))
+ eq(len(data), 5)
+ eq(data['foo'], 1)
+ eq(data['bar'], 2)
+ eq(data['version'], 3)
+ eq(data['listname'], '_xtest')
+ self.failUnless(data['received_time'] <= time.time())
+
+
+
+class TestToUsenet(TestBase):
+ def setUp(self):
+ TestBase.setUp(self)
+ # We're going to want to inspect this queue directory
+ self._sb = Switchboard(mm_cfg.NEWSQUEUE_DIR)
+
+ def tearDown(self):
+ for f in os.listdir(mm_cfg.NEWSQUEUE_DIR):
+ os.unlink(os.path.join(mm_cfg.NEWSQUEUE_DIR, f))
+ TestBase.tearDown(self)
+
+ def test_short_circuit(self):
+ eq = self.assertEqual
+ mlist = self._mlist
+ mlist.gateway_to_news = 0
+ ToUsenet.process(mlist, None, {})
+ eq(len(self._sb.files()), 0)
+ mlist.gateway_to_news = 1
+ ToUsenet.process(mlist, None, {'isdigest': 1})
+ eq(len(self._sb.files()), 0)
+ ToUsenet.process(mlist, None, {'fromusenet': 1})
+ eq(len(self._sb.files()), 0)
+
+ def test_to_usenet(self):
+ # BAW: Should we, can we, test the error conditions that only log to a
+ # file instead of raising an exception?
+ eq = self.assertEqual
+ mlist = self._mlist
+ mlist.gateway_to_news = 1
+ mlist.linked_newsgroup = 'foo'
+ mlist.nntp_host = 'bar'
+ msg = email.message_from_string("""\
+Subject: About Mailman
+
+Mailman rocks!
+""")
+ ToUsenet.process(mlist, msg, {})
+ files = self._sb.files()
+ eq(len(files), 1)
+ msg2, data = self._sb.dequeue(files[0])
+ eq(msg.as_string(unixfrom=0), msg2.as_string(unixfrom=0))
+ eq(data['version'], 3)
+ eq(data['listname'], '_xtest')
+ self.failUnless(data['received_time'] <= time.time())
+
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestAcknowledge))
+ suite.addTest(unittest.makeSuite(TestAfterDelivery))
+ suite.addTest(unittest.makeSuite(TestApprove))
+ suite.addTest(unittest.makeSuite(TestCalcRecips))
+ suite.addTest(unittest.makeSuite(TestCleanse))
+ suite.addTest(unittest.makeSuite(TestCookHeaders))
+ suite.addTest(unittest.makeSuite(TestDecorate))
+ suite.addTest(unittest.makeSuite(TestFileRecips))
+ suite.addTest(unittest.makeSuite(TestHold))
+ suite.addTest(unittest.makeSuite(TestPersonalize))
+ suite.addTest(unittest.makeSuite(TestReplybot))
+ suite.addTest(unittest.makeSuite(TestSMTPDirect))
+ suite.addTest(unittest.makeSuite(TestSendmail))
+ suite.addTest(unittest.makeSuite(TestSpamDetect))
+ suite.addTest(unittest.makeSuite(TestTagger))
+ suite.addTest(unittest.makeSuite(TestToArchive))
+ suite.addTest(unittest.makeSuite(TestToDigest))
+ suite.addTest(unittest.makeSuite(TestToOutgoing))
+ suite.addTest(unittest.makeSuite(TestToUsenet))
+ return suite
+
+
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='suite')
diff --git a/tests/test_membership.py b/tests/test_membership.py
new file mode 100644
index 000000000..1c665a162
--- /dev/null
+++ b/tests/test_membership.py
@@ -0,0 +1,331 @@
+# Copyright (C) 2001 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""Unit tests for OldStyleMemberships.
+"""
+
+import os
+import unittest
+
+from Mailman import mm_cfg
+from Mailman import Utils
+from Mailman import MailList
+from Mailman.Errors import NotAMemberError
+from Mailman.UserDesc import UserDesc
+
+from TestBase import TestBase
+
+
+
+class TestNoMembers(TestBase):
+ def test_no_member(self):
+ eq = self.assertEqual
+ raises = self.assertRaises
+ mlist = self._mlist
+ eq(mlist.getMembers(), [])
+ eq(mlist.getRegularMemberKeys(), [])
+ eq(mlist.getDigestMemberKeys(), [])
+ self.failIf(mlist.isMember('nobody@dom.ain'))
+ raises(NotAMemberError, mlist.getMemberKey, 'nobody@dom.ain')
+ raises(NotAMemberError, mlist.getMemberCPAddress, 'nobody@dom.ain')
+ eq(mlist.getMemberCPAddresses(('nobody@dom.ain', 'noperson@dom.ain')),
+ [None, None])
+ raises(NotAMemberError, mlist.getMemberPassword, 'nobody@dom.ain')
+ raises(NotAMemberError, mlist.authenticateMember,
+ 'nobody@dom.ain', 'blarg')
+ eq(mlist.getMemberLanguage('nobody@dom.ain'), mlist.preferred_language)
+ raises(NotAMemberError, mlist.getMemberOption,
+ 'nobody@dom.ain', mm_cfg.AcknowledgePosts)
+ raises(NotAMemberError, mlist.getMemberName, 'nobody@dom.ain')
+ raises(NotAMemberError, mlist.getMemberTopics, 'nobody@dom.ain')
+ raises(NotAMemberError, mlist.removeMember, 'nobody@dom.ain')
+
+ def test_add_member_mixed_case(self):
+ eq = self.assertEqual
+ mlist = self._mlist
+ mlist.addNewMember('APerson@dom.AIN')
+ eq(mlist.getMembers(), ['aperson@dom.ain'])
+ eq(mlist.getRegularMemberKeys(), ['aperson@dom.ain'])
+ self.failUnless(mlist.isMember('APerson@dom.AIN'))
+ self.failUnless(mlist.isMember('aperson@dom.ain'))
+ self.failUnless(mlist.isMember('APERSON@DOM.AIN'))
+ eq(mlist.getMemberCPAddress('aperson@dom.ain'), 'APerson@dom.AIN')
+ eq(mlist.getMemberCPAddress('APerson@dom.ain'), 'APerson@dom.AIN')
+ eq(mlist.getMemberCPAddress('APERSON@DOM.AIN'), 'APerson@dom.AIN')
+ eq(mlist.getMemberCPAddresses(('aperson@dom.ain',)),
+ ['APerson@dom.AIN'])
+ eq(mlist.getMemberCPAddresses(('APerson@dom.ain',)),
+ ['APerson@dom.AIN'])
+ eq(mlist.getMemberCPAddresses(('APERSON@DOM.AIN',)),
+ ['APerson@dom.AIN'])
+
+
+
+class TestMembers(TestBase):
+ def setUp(self):
+ TestBase.setUp(self)
+ self._mlist.addNewMember('person@dom.ain',
+ digest=0,
+ password='xxXXxx',
+ language='xx',
+ realname='A. Nice Person')
+
+ def test_add_member(self):
+ eq = self.assertEqual
+ mlist = self._mlist
+ eq(mlist.getMembers(), ['person@dom.ain'])
+ eq(mlist.getRegularMemberKeys(), ['person@dom.ain'])
+ eq(mlist.getDigestMemberKeys(), [])
+ self.failUnless(mlist.isMember('person@dom.ain'))
+ eq(mlist.getMemberKey('person@dom.ain'), 'person@dom.ain')
+ eq(mlist.getMemberCPAddress('person@dom.ain'), 'person@dom.ain')
+ eq(mlist.getMemberCPAddresses(('person@dom.ain', 'noperson@dom.ain')),
+ ['person@dom.ain', None])
+ eq(mlist.getMemberPassword('person@dom.ain'), 'xxXXxx')
+ eq(mlist.getMemberLanguage('person@dom.ain'), 'xx')
+ eq(mlist.getMemberOption('person@dom.ain', mm_cfg.Digests), 0)
+ eq(mlist.getMemberOption('person@dom.ain', mm_cfg.AcknowledgePosts), 0)
+ eq(mlist.getMemberName('person@dom.ain'), 'A. Nice Person')
+ eq(mlist.getMemberTopics('person@dom.ain'), [])
+
+ def test_authentication(self):
+ mlist = self._mlist
+ self.failIf(mlist.authenticateMember('person@dom.ain', 'xxx'))
+ self.assertEqual(mlist.authenticateMember('person@dom.ain', 'xxXXxx'),
+ 'xxXXxx')
+
+ def test_remove_member(self):
+ eq = self.assertEqual
+ raises = self.assertRaises
+ mlist = self._mlist
+ mlist.removeMember('person@dom.ain')
+ eq(mlist.getMembers(), [])
+ eq(mlist.getRegularMemberKeys(), [])
+ eq(mlist.getDigestMemberKeys(), [])
+ self.failIf(mlist.isMember('person@dom.ain'))
+ raises(NotAMemberError, mlist.getMemberKey, 'person@dom.ain')
+ raises(NotAMemberError, mlist.getMemberCPAddress, 'person@dom.ain')
+ eq(mlist.getMemberCPAddresses(('person@dom.ain', 'noperson@dom.ain')),
+ [None, None])
+ raises(NotAMemberError, mlist.getMemberPassword, 'person@dom.ain')
+ raises(NotAMemberError, mlist.authenticateMember,
+ 'person@dom.ain', 'blarg')
+ eq(mlist.getMemberLanguage('person@dom.ain'), mlist.preferred_language)
+ raises(NotAMemberError, mlist.getMemberOption,
+ 'person@dom.ain', mm_cfg.AcknowledgePosts)
+ raises(NotAMemberError, mlist.getMemberName, 'person@dom.ain')
+ raises(NotAMemberError, mlist.getMemberTopics, 'person@dom.ain')
+
+ def test_change_address(self):
+ eq = self.assertEqual
+ raises = self.assertRaises
+ mlist = self._mlist
+ mlist.changeMemberAddress('person@dom.ain', 'nice@dom.ain')
+ # Check the new address
+ eq(mlist.getMembers(), ['nice@dom.ain'])
+ eq(mlist.getRegularMemberKeys(), ['nice@dom.ain'])
+ eq(mlist.getDigestMemberKeys(), [])
+ self.failUnless(mlist.isMember('nice@dom.ain'))
+ eq(mlist.getMemberKey('nice@dom.ain'), 'nice@dom.ain')
+ eq(mlist.getMemberCPAddress('nice@dom.ain'), 'nice@dom.ain')
+ eq(mlist.getMemberCPAddresses(('nice@dom.ain', 'nonice@dom.ain')),
+ ['nice@dom.ain', None])
+ eq(mlist.getMemberPassword('nice@dom.ain'), 'xxXXxx')
+ eq(mlist.getMemberLanguage('nice@dom.ain'), 'xx')
+ eq(mlist.getMemberOption('nice@dom.ain', mm_cfg.Digests), 0)
+ eq(mlist.getMemberOption('nice@dom.ain', mm_cfg.AcknowledgePosts), 0)
+ eq(mlist.getMemberName('nice@dom.ain'), 'A. Nice Person')
+ eq(mlist.getMemberTopics('nice@dom.ain'), [])
+ # Check the old address
+ eq(mlist.getMembers(), ['nice@dom.ain'])
+ eq(mlist.getRegularMemberKeys(), ['nice@dom.ain'])
+ eq(mlist.getDigestMemberKeys(), [])
+ self.failIf(mlist.isMember('person@dom.ain'))
+ raises(NotAMemberError, mlist.getMemberKey, 'person@dom.ain')
+ raises(NotAMemberError, mlist.getMemberCPAddress, 'person@dom.ain')
+ eq(mlist.getMemberCPAddresses(('person@dom.ain', 'noperson@dom.ain')),
+ [None, None])
+ raises(NotAMemberError, mlist.getMemberPassword, 'person@dom.ain')
+ raises(NotAMemberError, mlist.authenticateMember,
+ 'person@dom.ain', 'blarg')
+ eq(mlist.getMemberLanguage('person@dom.ain'), mlist.preferred_language)
+ raises(NotAMemberError, mlist.getMemberOption,
+ 'person@dom.ain', mm_cfg.AcknowledgePosts)
+ raises(NotAMemberError, mlist.getMemberName, 'person@dom.ain')
+ raises(NotAMemberError, mlist.getMemberTopics, 'person@dom.ain')
+
+ def test_set_password(self):
+ eq = self.assertEqual
+ mlist = self._mlist
+ mlist.setMemberPassword('person@dom.ain', 'yyYYyy')
+ eq(mlist.getMemberPassword('person@dom.ain'), 'yyYYyy')
+ eq(mlist.authenticateMember('person@dom.ain', 'yyYYyy'), 'yyYYyy')
+ self.failIf(mlist.authenticateMember('person@dom.ain', 'xxXXxx'))
+
+ def test_set_language(self):
+ self._mlist.setMemberLanguage('person@dom.ain', 'yy')
+ self.assertEqual(self._mlist.getMemberLanguage('person@dom.ain'), 'yy')
+
+ def test_basic_option(self):
+ eq = self.assertEqual
+ gmo = self._mlist.getMemberOption
+ # First test the current option values
+ eq(gmo('person@dom.ain', mm_cfg.Digests), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableDelivery), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DontReceiveOwnPosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.AcknowledgePosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableMime), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ConcealSubscription), 0)
+ eq(gmo('person@dom.ain', mm_cfg.SuppressPasswordReminder), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ReceiveNonmatchingTopics), 0)
+
+ def test_set_digests(self):
+ eq = self.assertEqual
+ gmo = self._mlist.getMemberOption
+ self._mlist.setMemberOption('person@dom.ain', mm_cfg.Digests, 1)
+ eq(gmo('person@dom.ain', mm_cfg.Digests), 1)
+ eq(gmo('person@dom.ain', mm_cfg.DisableDelivery), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DontReceiveOwnPosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.AcknowledgePosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableMime), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ConcealSubscription), 0)
+ eq(gmo('person@dom.ain', mm_cfg.SuppressPasswordReminder), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ReceiveNonmatchingTopics), 0)
+
+ def test_set_disable_delivery(self):
+ eq = self.assertEqual
+ gmo = self._mlist.getMemberOption
+ self._mlist.setMemberOption('person@dom.ain',
+ mm_cfg.DisableDelivery, 1)
+ eq(gmo('person@dom.ain', mm_cfg.Digests), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableDelivery), 1)
+ eq(gmo('person@dom.ain', mm_cfg.DontReceiveOwnPosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.AcknowledgePosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableMime), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ConcealSubscription), 0)
+ eq(gmo('person@dom.ain', mm_cfg.SuppressPasswordReminder), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ReceiveNonmatchingTopics), 0)
+
+ def test_set_dont_receive_own_posts(self):
+ eq = self.assertEqual
+ gmo = self._mlist.getMemberOption
+ self._mlist.setMemberOption('person@dom.ain',
+ mm_cfg.DontReceiveOwnPosts, 1)
+ eq(gmo('person@dom.ain', mm_cfg.Digests), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableDelivery), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DontReceiveOwnPosts), 1)
+ eq(gmo('person@dom.ain', mm_cfg.AcknowledgePosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableMime), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ConcealSubscription), 0)
+ eq(gmo('person@dom.ain', mm_cfg.SuppressPasswordReminder), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ReceiveNonmatchingTopics), 0)
+
+ def test_set_acknowledge_posts(self):
+ eq = self.assertEqual
+ gmo = self._mlist.getMemberOption
+ self._mlist.setMemberOption('person@dom.ain',
+ mm_cfg.AcknowledgePosts, 1)
+ eq(gmo('person@dom.ain', mm_cfg.Digests), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableDelivery), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DontReceiveOwnPosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.AcknowledgePosts), 1)
+ eq(gmo('person@dom.ain', mm_cfg.DisableMime), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ConcealSubscription), 0)
+ eq(gmo('person@dom.ain', mm_cfg.SuppressPasswordReminder), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ReceiveNonmatchingTopics), 0)
+
+ def test_disable_mime(self):
+ eq = self.assertEqual
+ gmo = self._mlist.getMemberOption
+ self._mlist.setMemberOption('person@dom.ain',
+ mm_cfg.DisableMime, 1)
+ eq(gmo('person@dom.ain', mm_cfg.Digests), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableDelivery), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DontReceiveOwnPosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.AcknowledgePosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableMime), 1)
+ eq(gmo('person@dom.ain', mm_cfg.ConcealSubscription), 0)
+ eq(gmo('person@dom.ain', mm_cfg.SuppressPasswordReminder), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ReceiveNonmatchingTopics), 0)
+
+ def test_conceal_subscription(self):
+ eq = self.assertEqual
+ gmo = self._mlist.getMemberOption
+ self._mlist.setMemberOption('person@dom.ain',
+ mm_cfg.ConcealSubscription, 1)
+ eq(gmo('person@dom.ain', mm_cfg.Digests), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableDelivery), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DontReceiveOwnPosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.AcknowledgePosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableMime), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ConcealSubscription), 1)
+ eq(gmo('person@dom.ain', mm_cfg.SuppressPasswordReminder), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ReceiveNonmatchingTopics), 0)
+
+ def test_suppress_password_reminder(self):
+ eq = self.assertEqual
+ gmo = self._mlist.getMemberOption
+ self._mlist.setMemberOption('person@dom.ain',
+ mm_cfg.SuppressPasswordReminder, 1)
+ eq(gmo('person@dom.ain', mm_cfg.Digests), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableDelivery), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DontReceiveOwnPosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.AcknowledgePosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableMime), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ConcealSubscription), 0)
+ eq(gmo('person@dom.ain', mm_cfg.SuppressPasswordReminder), 1)
+ eq(gmo('person@dom.ain', mm_cfg.ReceiveNonmatchingTopics), 0)
+
+ def test_receive_nonmatching_topics(self):
+ eq = self.assertEqual
+ gmo = self._mlist.getMemberOption
+ self._mlist.setMemberOption('person@dom.ain',
+ mm_cfg.ReceiveNonmatchingTopics, 1)
+ eq(gmo('person@dom.ain', mm_cfg.Digests), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableDelivery), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DontReceiveOwnPosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.AcknowledgePosts), 0)
+ eq(gmo('person@dom.ain', mm_cfg.DisableMime), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ConcealSubscription), 0)
+ eq(gmo('person@dom.ain', mm_cfg.SuppressPasswordReminder), 0)
+ eq(gmo('person@dom.ain', mm_cfg.ReceiveNonmatchingTopics), 1)
+
+ def test_member_name(self):
+ self._mlist.setMemberName('person@dom.ain', 'A. Good Person')
+ self.assertEqual(self._mlist.getMemberName('person@dom.ain'),
+ 'A. Good Person')
+
+ def test_member_topics(self):
+ eq = self.assertEqual
+ mlist = self._mlist
+ mlist.setMemberTopics('person@dom.ain', ['topic1', 'topic2', 'topic3'])
+ eq(mlist.getMemberTopics('person@dom.ain'),
+ ['topic1', 'topic2', 'topic3'])
+ mlist.setMemberTopics('person@dom.ain', None)
+ eq(mlist.getMemberTopics('person@dom.ain'), [])
+
+
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestNoMembers))
+ suite.addTest(unittest.makeSuite(TestMembers))
+ return suite
+
+
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='suite')
diff --git a/tests/testall.py b/tests/testall.py
index eceedca05..fe6d01171 100644
--- a/tests/testall.py
+++ b/tests/testall.py
@@ -18,7 +18,7 @@
import unittest
-MODULES = ['bounces']
+MODULES = ('bounces', 'handlers', 'membership')