summaryrefslogtreecommitdiff
path: root/Mailman
diff options
context:
space:
mode:
Diffstat (limited to 'Mailman')
-rw-r--r--Mailman/tests/emailbase.py106
-rw-r--r--Mailman/tests/smtplistener.py91
2 files changed, 91 insertions, 106 deletions
diff --git a/Mailman/tests/emailbase.py b/Mailman/tests/emailbase.py
deleted file mode 100644
index 21b85ad63..000000000
--- a/Mailman/tests/emailbase.py
+++ /dev/null
@@ -1,106 +0,0 @@
-# Copyright (C) 2001-2007 by the Free Software Foundation, Inc.
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
-# USA.
-
-"""Base class for tests that email things."""
-
-import os
-import time
-import errno
-import smtpd
-import socket
-import asyncore
-import subprocess
-
-from Mailman.configuration import config
-from Mailman.tests.base import TestBase
-
-TESTPORT = 10825
-
-
-
-MSGTEXT = None
-
-class OneShotChannel(smtpd.SMTPChannel):
- def smtp_QUIT(self, arg):
- smtpd.SMTPChannel.smtp_QUIT(self, arg)
- raise asyncore.ExitNow
-
-
-class SinkServer(smtpd.SMTPServer):
- def handle_accept(self):
- conn, addr = self.accept()
- channel = OneShotChannel(self, conn, addr)
-
- def process_message(self, peer, mailfrom, rcpttos, data):
- global MSGTEXT
- MSGTEXT = data
-
-
-
-class EmailBase(TestBase):
- def setUp(self):
- TestBase.setUp(self)
- try:
- # Second argument is ignored.
- self._server = SinkServer(('localhost', TESTPORT), None)
- except:
- TestBase.tearDown(self)
- raise
- try:
- os.system('bin/mailmanctl -C %s -q start' % config.filename)
- # If any errors occur in the above, be sure to manually call
- # tearDown(). unittest doesn't call tearDown() for errors in
- # setUp().
- except:
- self.tearDown()
- raise
-
- def tearDown(self):
- os.system('bin/mailmanctl -C %s -q stop' % config.filename)
- self._server.close()
- # Wait a while until the server actually goes away
- while True:
- try:
- s = socket.socket()
- s.connect(('localhost', TESTPORT))
- s.close()
- time.sleep(3)
- except socket.error, e:
- # IWBNI e had an errno attribute
- if e[0] in (errno.ECONNREFUSED, errno.ETIMEDOUT):
- break
- else:
- raise
- TestBase.tearDown(self)
-
- def _readmsg(self):
- global MSGTEXT
- # Save and unlock the list so that the qrunner process can open it and
- # lock it if necessary. We'll re-lock the list in our finally clause
- # since that if an invariant of the test harness.
- self._mlist.Unlock()
- try:
- try:
- # timeout is in milliseconds, see asyncore.py poll3()
- asyncore.loop()
- MSGTEXT = None
- except asyncore.ExitNow:
- pass
- asyncore.close_all()
- return MSGTEXT
- finally:
- self._mlist.Lock()
diff --git a/Mailman/tests/smtplistener.py b/Mailman/tests/smtplistener.py
new file mode 100644
index 000000000..565772b1d
--- /dev/null
+++ b/Mailman/tests/smtplistener.py
@@ -0,0 +1,91 @@
+# Copyright (C) 2007 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
+# USA.
+
+import sys
+import smtpd
+import mailbox
+import asyncore
+import optparse
+
+from email import message_from_string
+
+COMMASPACE = ', '
+DEFAULT_PORT = 9025
+
+
+
+class Channel(smtpd.SMTPChannel):
+ def smtp_EXIT(self, arg):
+ raise asyncore.ExitNow
+
+
+class Server(smtpd.SMTPServer):
+ def __init__(self, localaddr, mboxfile):
+ smtpd.SMTPServer.__init__(self, localaddr, None)
+ self._mbox = mailbox.mbox(mboxfile)
+
+ def handle_accept(self):
+ conn, addr = self.accept()
+ Channel(self, conn, addr)
+
+ def process_message(self, peer, mailfrom, rcpttos, data):
+ msg = message_from_string(data)
+ msg['X-Peer'] = peer
+ msg['X-MailFrom'] = mailfrom
+ msg['X-RcptTo'] = COMMASPACE.join(rcpttos)
+ self._mbox.add(msg)
+
+ def close(self):
+ self._mbox.flush()
+ self._mbox.close()
+
+
+
+def main():
+ parser = optparse.OptionParser(usage='%prog mboxfile')
+ parser.add_option('-a', '--address',
+ type='string', default=None,
+ help='host:port to listen on')
+ opts, args = parser.parse_args()
+ if len(args) == 0:
+ parser.error('Missing mbox file')
+ elif len(args) > 1:
+ parser.error('Unexpected arguments')
+
+ mboxfile = args[0]
+ if opts.address is None:
+ host = 'localhost'
+ port = DEFAULT_PORT
+ elif ':' not in opts.address:
+ host = opts.address
+ port = DEFAULT_PORT
+ else:
+ host, port = opts.address.split(':', 1)
+ port = int(port)
+
+ server = Server((host, port), mboxfile)
+ try:
+ asyncore.loop()
+ except asyncore.ExitNow:
+ asyncore.close_all()
+ server.close()
+ return 0
+
+
+
+if __name__ == '__main__':
+ sys.exit(main())