diff options
Diffstat (limited to 'src/mailman/testing/helpers.py')
| -rw-r--r-- | src/mailman/testing/helpers.py | 81 |
1 files changed, 81 insertions, 0 deletions
diff --git a/src/mailman/testing/helpers.py b/src/mailman/testing/helpers.py index 58c72d6d9..3648a6710 100644 --- a/src/mailman/testing/helpers.py +++ b/src/mailman/testing/helpers.py @@ -1,3 +1,4 @@ +# Copyright (C) 2008-2012 by the Free Software Foundation, Inc. # # This file is part of GNU Mailman. # @@ -22,10 +23,13 @@ __metaclass__ = type __all__ = [ 'LogFileMark', 'TestableMaster', + 'body_line_iterator', 'call_api', + 'configuration', 'digest_mbox', 'event_subscribers', 'get_lmtp_client', + 'get_nntp_server', 'get_queue_messages', 'make_testable_runner', 'reset_the_world', @@ -37,7 +41,9 @@ __all__ = [ import os import json +import mock import time +import uuid import errno import signal import socket @@ -66,6 +72,9 @@ from mailman.interfaces.usermanager import IUserManager from mailman.utilities.mailbox import Mailbox +NL = '\n' + + def make_testable_runner(runner_class, name=None, predicate=None): """Create a runner that runs until its queue is empty. @@ -246,6 +255,24 @@ def get_lmtp_client(quiet=False): +def get_nntp_server(cleanups): + """Create and start an NNTP server mock. + + This can be used to retrieve the posted message for verification. + """ + patcher = mock.patch('nntplib.NNTP') + server_class = patcher.start() + cleanups.append(patcher.stop) + nntpd = server_class() + # A class for more convenient access to the posted message. + class NNTPProxy: + def get_message(self): + args = nntpd.post.call_args + return specialized_message_from_string(args[0][0].read()) + return NNTPProxy() + + + def wait_for_webservice(): """Wait for the REST server to start serving requests.""" until = datetime.datetime.now() + as_timedelta(config.devmode.wait) @@ -329,6 +356,42 @@ def event_subscribers(*subscribers): +class configuration: + """A decorator/context manager for temporarily setting configurations.""" + + def __init__(self, section, **kws): + self._section = section + self._values = kws.copy() + self._uuid = uuid.uuid4().hex + + def _apply(self): + lines = ['[{0}]'.format(self._section)] + for key, value in self._values.items(): + lines.append('{0}: {1}'.format(key, value)) + config.push(self._uuid, NL.join(lines)) + + def _remove(self): + config.pop(self._uuid) + + def __enter__(self): + self._apply() + + def __exit__(self, *exc_info): + self._remove() + # Do not suppress exceptions. + return False + + def __call__(self, func): + def wrapper(*args, **kws): + self._apply() + try: + return func(*args, **kws) + finally: + self._remove() + return wrapper + + + def subscribe(mlist, first_name, role=MemberRole.member): """Helper for subscribing a sample person to a mailing list.""" user_manager = getUtility(IUserManager) @@ -379,6 +442,8 @@ def reset_the_world(): config.db.commit() # Reset the global style manager. getUtility(IStyleManager).populate() + # Remove all dynamic header-match rules. + config.chains['header-match'].flush() @@ -412,3 +477,19 @@ class LogFileMark: with open(self._filename) as fp: fp.seek(self._filepos) return fp.readline() + + + +# In Python 2.6, body_line_iterator() uses a cStringIO.StringIO() which cannot +# handle unicode. In Python 2.7 this works fine. I hate version checks but +# this is the easiest way to handle it. OTOH, we could just use the manual +# way for all Python versions instead. +import sys +if sys.hexversion >= 0x2070000: + from email.iterators import body_line_iterator +else: + def body_line_iterator(msg, decode=False): + payload = msg.get_payload(decode=decode) + bytes_payload = payload.encode('utf-8') + for line in bytes_payload.splitlines(): + yield line |
