diff options
| author | Barry Warsaw | 2008-02-27 01:26:18 -0500 |
|---|---|---|
| committer | Barry Warsaw | 2008-02-27 01:26:18 -0500 |
| commit | a1c73f6c305c7f74987d99855ba59d8fa823c253 (patch) | |
| tree | 65696889450862357c9e05c8e9a589f1bdc074ac /Mailman/tests/helpers.py | |
| parent | 3f31f8cce369529d177cfb5a7c66346ec1e12130 (diff) | |
| download | mailman-a1c73f6c305c7f74987d99855ba59d8fa823c253.tar.gz mailman-a1c73f6c305c7f74987d99855ba59d8fa823c253.tar.zst mailman-a1c73f6c305c7f74987d99855ba59d8fa823c253.zip | |
Diffstat (limited to 'Mailman/tests/helpers.py')
| -rw-r--r-- | Mailman/tests/helpers.py | 134 |
1 files changed, 0 insertions, 134 deletions
diff --git a/Mailman/tests/helpers.py b/Mailman/tests/helpers.py deleted file mode 100644 index cb97fc5e6..000000000 --- a/Mailman/tests/helpers.py +++ /dev/null @@ -1,134 +0,0 @@ -# Copyright (C) 2008 by the Free Software Foundation, Inc. -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. - -"""Various test helpers.""" - -from __future__ import with_statement - -__metaclass__ = type -__all__ = [ - 'TestableMaster', - 'digest_mbox', - 'get_queue_messages', - 'make_testable_runner', - ] - - -import os -import time -import errno -import mailbox -import subprocess - -from datetime import datetime, timedelta - -from Mailman.bin.master import Loop as Master -from Mailman.configuration import config -from Mailman.queue import Switchboard - - -WAIT_INTERVAL = timedelta(seconds=3) - - - -def make_testable_runner(runner_class): - """Create a queue runner that runs until its queue is empty. - - :param runner_class: An IRunner - :return: A runner instance. - """ - - class EmptyingRunner(runner_class): - """Stop processing when the queue is empty.""" - - def _doperiodic(self): - """Stop when the queue is empty.""" - self._stop = (len(self._switchboard.files) == 0) - - return EmptyingRunner() - - - -class _Bag: - def __init__(self, **kws): - for key, value in kws.items(): - setattr(self, key, value) - - -def get_queue_messages(queue): - """Return and clear all the messages in the given queue. - - :param queue: An ISwitchboard or a string naming a queue. - :return: A list of 2-tuples where each item contains the message and - message metadata. - """ - if isinstance(queue, basestring): - queue = Switchboard(queue) - messages = [] - for filebase in queue.files: - msg, msgdata = queue.dequeue(filebase) - messages.append(_Bag(msg=msg, msgdata=msgdata)) - queue.finish(filebase) - return messages - - - -def digest_mbox(mlist): - """The mailing list's pending digest as a mailbox. - - :param mlist: The mailing list. - :return: The mailing list's pending digest as a mailbox. - """ - path = os.path.join(mlist.full_path, 'digest.mbox') - return mailbox.mbox(path) - - - -class TestableMaster(Master): - """A testable master loop watcher.""" - - def __init__(self, event): - super(TestableMaster, self).__init__( - restartable=False, config_file=config.filename) - self._event = event - self._started_kids = None - - def loop(self): - """Wait until all the qrunners are actually running before looping.""" - starting_kids = set(self._kids) - while starting_kids: - for pid in self._kids: - try: - os.kill(pid, 0) - starting_kids.remove(pid) - except OSError, error: - if error.errno == errno.ESRCH: - # The child has not yet started. - pass - raise - # Keeping a copy of all the started child processes for use by the - # testing environment, even after all have exited. - self._started_kids = set(self._kids) - # Let the blocking thread know everything's running. - self._event.set() - super(TestableMaster, self).loop() - - @property - def qrunner_pids(self): - """The pids of all the child qrunner processes.""" - for pid in self._started_kids: - yield pid |
