summaryrefslogtreecommitdiff
path: root/src/mailman/testing/helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman/testing/helpers.py')
-rw-r--r--src/mailman/testing/helpers.py81
1 files changed, 81 insertions, 0 deletions
diff --git a/src/mailman/testing/helpers.py b/src/mailman/testing/helpers.py
index 58c72d6d9..3648a6710 100644
--- a/src/mailman/testing/helpers.py
+++ b/src/mailman/testing/helpers.py
@@ -1,3 +1,4 @@
+# Copyright (C) 2008-2012 by the Free Software Foundation, Inc.
#
# This file is part of GNU Mailman.
#
@@ -22,10 +23,13 @@ __metaclass__ = type
__all__ = [
'LogFileMark',
'TestableMaster',
+ 'body_line_iterator',
'call_api',
+ 'configuration',
'digest_mbox',
'event_subscribers',
'get_lmtp_client',
+ 'get_nntp_server',
'get_queue_messages',
'make_testable_runner',
'reset_the_world',
@@ -37,7 +41,9 @@ __all__ = [
import os
import json
+import mock
import time
+import uuid
import errno
import signal
import socket
@@ -66,6 +72,9 @@ from mailman.interfaces.usermanager import IUserManager
from mailman.utilities.mailbox import Mailbox
+NL = '\n'
+
+
def make_testable_runner(runner_class, name=None, predicate=None):
"""Create a runner that runs until its queue is empty.
@@ -246,6 +255,24 @@ def get_lmtp_client(quiet=False):
+def get_nntp_server(cleanups):
+ """Create and start an NNTP server mock.
+
+ This can be used to retrieve the posted message for verification.
+ """
+ patcher = mock.patch('nntplib.NNTP')
+ server_class = patcher.start()
+ cleanups.append(patcher.stop)
+ nntpd = server_class()
+ # A class for more convenient access to the posted message.
+ class NNTPProxy:
+ def get_message(self):
+ args = nntpd.post.call_args
+ return specialized_message_from_string(args[0][0].read())
+ return NNTPProxy()
+
+
+
def wait_for_webservice():
"""Wait for the REST server to start serving requests."""
until = datetime.datetime.now() + as_timedelta(config.devmode.wait)
@@ -329,6 +356,42 @@ def event_subscribers(*subscribers):
+class configuration:
+ """A decorator/context manager for temporarily setting configurations."""
+
+ def __init__(self, section, **kws):
+ self._section = section
+ self._values = kws.copy()
+ self._uuid = uuid.uuid4().hex
+
+ def _apply(self):
+ lines = ['[{0}]'.format(self._section)]
+ for key, value in self._values.items():
+ lines.append('{0}: {1}'.format(key, value))
+ config.push(self._uuid, NL.join(lines))
+
+ def _remove(self):
+ config.pop(self._uuid)
+
+ def __enter__(self):
+ self._apply()
+
+ def __exit__(self, *exc_info):
+ self._remove()
+ # Do not suppress exceptions.
+ return False
+
+ def __call__(self, func):
+ def wrapper(*args, **kws):
+ self._apply()
+ try:
+ return func(*args, **kws)
+ finally:
+ self._remove()
+ return wrapper
+
+
+
def subscribe(mlist, first_name, role=MemberRole.member):
"""Helper for subscribing a sample person to a mailing list."""
user_manager = getUtility(IUserManager)
@@ -379,6 +442,8 @@ def reset_the_world():
config.db.commit()
# Reset the global style manager.
getUtility(IStyleManager).populate()
+ # Remove all dynamic header-match rules.
+ config.chains['header-match'].flush()
@@ -412,3 +477,19 @@ class LogFileMark:
with open(self._filename) as fp:
fp.seek(self._filepos)
return fp.readline()
+
+
+
+# In Python 2.6, body_line_iterator() uses a cStringIO.StringIO() which cannot
+# handle unicode. In Python 2.7 this works fine. I hate version checks but
+# this is the easiest way to handle it. OTOH, we could just use the manual
+# way for all Python versions instead.
+import sys
+if sys.hexversion >= 0x2070000:
+ from email.iterators import body_line_iterator
+else:
+ def body_line_iterator(msg, decode=False):
+ payload = msg.get_payload(decode=decode)
+ bytes_payload = payload.encode('utf-8')
+ for line in bytes_payload.splitlines():
+ yield line