1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
# Copyright (C) 2008 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
"""Various test helpers."""
__metaclass__ = type
__all__ = [
'digest_mbox',
'get_queue_messages',
'make_testable_runner',
]
import os
import mailbox
from Mailman.queue import Switchboard
def make_testable_runner(runner_class):
"""Create a queue runner that runs until its queue is empty.
:param runner_class: An IRunner
:return: A runner instance.
"""
class EmptyingRunner(runner_class):
"""Stop processing when the queue is empty."""
def _doperiodic(self):
"""Stop when the queue is empty."""
self._stop = (len(self._switchboard.files) == 0)
return EmptyingRunner()
class _Bag:
def __init__(self, **kws):
for key, value in kws.items():
setattr(self, key, value)
def get_queue_messages(queue):
"""Return and clear all the messages in the given queue.
:param queue: An ISwitchboard or a string naming a queue.
:return: A list of 2-tuples where each item contains the message and
message metadata.
"""
if isinstance(queue, basestring):
queue = Switchboard(queue)
messages = []
for filebase in queue.files:
msg, msgdata = queue.dequeue(filebase)
messages.append(_Bag(msg=msg, msgdata=msgdata))
queue.finish(filebase)
return messages
def digest_mbox(mlist):
"""The mailing list's pending digest as a mailbox.
:param mlist: The mailing list.
:return: The mailing list's pending digest as a mailbox.
"""
path = os.path.join(mlist.full_path, 'digest.mbox')
return mailbox.mbox(path)
|