summaryrefslogtreecommitdiff
path: root/mailman/tests/helpers.py
diff options
context:
space:
mode:
authorBarry Warsaw2008-07-03 15:32:25 -0400
committerBarry Warsaw2008-07-03 15:32:25 -0400
commit408043ad8404798e9b8b93a1ee1cd81db897639c (patch)
tree7002f8465007663a700d281aaed8cc701a50cabf /mailman/tests/helpers.py
parented752bbc1cb7731ce0eebb8cd7301d0322b9ef49 (diff)
downloadmailman-408043ad8404798e9b8b93a1ee1cd81db897639c.tar.gz
mailman-408043ad8404798e9b8b93a1ee1cd81db897639c.tar.zst
mailman-408043ad8404798e9b8b93a1ee1cd81db897639c.zip
Diffstat (limited to 'mailman/tests/helpers.py')
-rw-r--r--mailman/tests/helpers.py237
1 files changed, 0 insertions, 237 deletions
diff --git a/mailman/tests/helpers.py b/mailman/tests/helpers.py
deleted file mode 100644
index d1eda9e34..000000000
--- a/mailman/tests/helpers.py
+++ /dev/null
@@ -1,237 +0,0 @@
-# Copyright (C) 2008 by the Free Software Foundation, Inc.
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
-# USA.
-
-"""Various test helpers."""
-
-from __future__ import with_statement
-
-__metaclass__ = type
-__all__ = [
- 'TestableMaster',
- 'digest_mbox',
- 'get_lmtp_client',
- 'get_queue_messages',
- 'make_testable_runner',
- ]
-
-
-import os
-import time
-import errno
-import signal
-import socket
-import logging
-import mailbox
-import smtplib
-import tempfile
-import threading
-
-from Queue import Empty, Queue
-from datetime import datetime, timedelta
-
-from mailman.bin.master import Loop as Master
-from mailman.configuration import config
-from mailman.queue import Switchboard
-from mailman.tests.smtplistener import Server
-
-
-log = logging.getLogger('mailman.debug')
-
-
-
-def make_testable_runner(runner_class):
- """Create a queue runner that runs until its queue is empty.
-
- :param runner_class: An IRunner
- :return: A runner instance.
- """
-
- class EmptyingRunner(runner_class):
- """Stop processing when the queue is empty."""
-
- def _doperiodic(self):
- """Stop when the queue is empty."""
- self._stop = (len(self._switchboard.files) == 0)
-
- return EmptyingRunner()
-
-
-
-class _Bag:
- def __init__(self, **kws):
- for key, value in kws.items():
- setattr(self, key, value)
-
-
-def get_queue_messages(queue):
- """Return and clear all the messages in the given queue.
-
- :param queue: An ISwitchboard or a string naming a queue.
- :return: A list of 2-tuples where each item contains the message and
- message metadata.
- """
- if isinstance(queue, basestring):
- queue = Switchboard(queue)
- messages = []
- for filebase in queue.files:
- msg, msgdata = queue.dequeue(filebase)
- messages.append(_Bag(msg=msg, msgdata=msgdata))
- queue.finish(filebase)
- return messages
-
-
-
-def digest_mbox(mlist):
- """The mailing list's pending digest as a mailbox.
-
- :param mlist: The mailing list.
- :return: The mailing list's pending digest as a mailbox.
- """
- path = os.path.join(mlist.data_path, 'digest.mbox')
- return mailbox.mbox(path)
-
-
-
-class TestableMaster(Master):
- """A testable master loop watcher."""
-
- def __init__(self):
- super(TestableMaster, self).__init__(
- restartable=False, config_file=config.filename)
- self.event = threading.Event()
- self.thread = threading.Thread(target=self.loop)
- self._started_kids = None
-
- def start(self, *qrunners):
- """Start the master."""
- self.start_qrunners(qrunners)
- self.thread.start()
- # Wait until all the children are definitely started.
- self.event.wait()
-
- def stop(self):
- """Stop the master by killing all the children."""
- for pid in self.qrunner_pids:
- os.kill(pid, signal.SIGTERM)
- self.cleanup()
- self.thread.join()
-
- def loop(self):
- """Wait until all the qrunners are actually running before looping."""
- starting_kids = set(self._kids)
- while starting_kids:
- for pid in self._kids:
- try:
- os.kill(pid, 0)
- starting_kids.remove(pid)
- except OSError, error:
- if error.errno == errno.ESRCH:
- # The child has not yet started.
- pass
- raise
- # Keeping a copy of all the started child processes for use by the
- # testing environment, even after all have exited.
- self._started_kids = set(self._kids)
- # Let the blocking thread know everything's running.
- self.event.set()
- super(TestableMaster, self).loop()
-
- @property
- def qrunner_pids(self):
- """The pids of all the child qrunner processes."""
- for pid in self._started_kids:
- yield pid
-
-
-
-class SMTPServer:
- """An smtp server for testing."""
-
- host = 'localhost'
- port = 9025
-
- def __init__(self):
- self._messages = []
- self._queue = Queue()
- self._server = Server((self.host, self.port), self._queue)
- self._thread = threading.Thread(target=self._server.start)
-
- def start(self):
- """Start the smtp server in a thread."""
- log.info('test SMTP server starting')
- self._thread.start()
-
- def stop(self):
- """Stop the smtp server."""
- smtpd = smtplib.SMTP()
- smtpd.connect(self.host, self.port)
- smtpd.docmd('EXIT')
- self.clear()
- # Wait for the thread to exit.
- self._thread.join()
- log.info('test SMTP server stopped')
-
- @property
- def messages(self):
- """Return all the messages received by the smtp server."""
- # Look at the thread queue and append any messages from there to our
- # internal list of messages.
- while True:
- try:
- message = self._queue.get_nowait()
- except Empty:
- break
- else:
- self._messages.append(message)
- # Now return all the messages we know about.
- for message in self._messages:
- yield message
-
- def clear(self):
- """Clear all messages from the queue."""
- # Just throw these away.
- list(self._messages)
- self._messages = []
-
-
-
-class LMTP(smtplib.SMTP):
- """Like a normal SMTP client, but for LMTP."""
- def lhlo(self, name=''):
- self.putcmd('lhlo', name or self.local_hostname)
- code, msg = self.getreply()
- self.helo_resp = msg
- return code, msg
-
-
-def get_lmtp_client():
- """Return a connected LMTP client."""
- # It's possible the process has started but is not yet accepting
- # connections. Wait a little while.
- lmtp = LMTP()
- for attempts in range(3):
- try:
- response = lmtp.connect(config.LMTP_HOST, config.LMTP_PORT)
- print response
- return lmtp
- except socket.error, error:
- if error[0] == errno.ECONNREFUSED:
- time.sleep(1)
- else:
- raise
- else:
- raise RuntimeError('Connection refused')