summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Mailman/Handlers/MimeDel.py4
-rw-r--r--Mailman/Handlers/ToArchive.py4
-rw-r--r--Mailman/Handlers/ToDigest.py4
-rw-r--r--Mailman/Handlers/ToOutgoing.py4
-rw-r--r--Mailman/Handlers/ToUsenet.py4
-rw-r--r--Mailman/MTA/Manual.py4
-rw-r--r--Mailman/Message.py8
-rw-r--r--Mailman/Post.py4
-rw-r--r--Mailman/Queue/tests/__init__.py0
-rw-r--r--Mailman/app/moderator.py4
-rw-r--r--Mailman/bin/bounces.py4
-rw-r--r--Mailman/bin/confirm.py4
-rw-r--r--Mailman/bin/gate_news.py4
-rw-r--r--Mailman/bin/join.py4
-rw-r--r--Mailman/bin/leave.py4
-rw-r--r--Mailman/bin/owner.py4
-rw-r--r--Mailman/bin/post.py4
-rw-r--r--Mailman/bin/qrunner.py49
-rw-r--r--Mailman/bin/request.py4
-rw-r--r--Mailman/bin/unshunt.py6
-rw-r--r--Mailman/configuration.py26
-rw-r--r--Mailman/docs/acknowledge.txt4
-rw-r--r--Mailman/docs/antispam.txt2
-rw-r--r--Mailman/docs/archives.txt2
-rw-r--r--Mailman/docs/bounces.txt2
-rw-r--r--Mailman/docs/digests.txt2
-rw-r--r--Mailman/docs/hold.txt2
-rw-r--r--Mailman/docs/message.txt2
-rw-r--r--Mailman/docs/news-runner.txt2
-rw-r--r--Mailman/docs/nntp.txt2
-rw-r--r--Mailman/docs/outgoing.txt2
-rw-r--r--Mailman/docs/registration.txt2
-rw-r--r--Mailman/docs/replybot.txt4
-rw-r--r--Mailman/docs/requests.txt6
-rw-r--r--Mailman/docs/runner.txt3
-rw-r--r--Mailman/docs/switchboard.txt2
-rw-r--r--Mailman/docs/tagger.txt2
-rw-r--r--Mailman/interfaces/runner.py31
-rw-r--r--Mailman/queue/Runner.py (renamed from Mailman/Queue/Runner.py)0
-rw-r--r--Mailman/queue/Switchboard.py (renamed from Mailman/Queue/Switchboard.py)0
-rw-r--r--Mailman/queue/__init__.py402
-rw-r--r--Mailman/queue/archive.py (renamed from Mailman/Queue/ArchRunner.py)4
-rw-r--r--Mailman/queue/bounce.py (renamed from Mailman/Queue/BounceRunner.py)5
-rw-r--r--Mailman/queue/command.py (renamed from Mailman/Queue/CommandRunner.py)2
-rw-r--r--Mailman/queue/http.py (renamed from Mailman/Queue/HTTPRunner.py)2
-rw-r--r--Mailman/queue/incoming.py (renamed from Mailman/Queue/IncomingRunner.py)2
-rw-r--r--Mailman/queue/lmtp.py (renamed from Mailman/Queue/LMTPRunner.py)17
-rw-r--r--Mailman/queue/maildir.py (renamed from Mailman/Queue/MaildirRunner.py)17
-rw-r--r--Mailman/queue/news.py (renamed from Mailman/Queue/NewsRunner.py)2
-rw-r--r--Mailman/queue/outgoing.py (renamed from Mailman/Queue/OutgoingRunner.py)5
-rw-r--r--Mailman/queue/retry.py (renamed from Mailman/Queue/RetryRunner.py)3
-rw-r--r--Mailman/queue/sbcache.py (renamed from Mailman/Queue/sbcache.py)0
-rw-r--r--Mailman/queue/tests/__init__.py (renamed from Mailman/Queue/__init__.py)0
-rw-r--r--Mailman/queue/virgin.py (renamed from Mailman/Queue/VirginRunner.py)4
54 files changed, 564 insertions, 126 deletions
diff --git a/Mailman/Handlers/MimeDel.py b/Mailman/Handlers/MimeDel.py
index 5f49f4b33..93349cb23 100644
--- a/Mailman/Handlers/MimeDel.py
+++ b/Mailman/Handlers/MimeDel.py
@@ -34,11 +34,11 @@ from os.path import splitext
from Mailman import Errors
from Mailman.Message import UserNotification
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.Utils import oneline
from Mailman.Version import VERSION
from Mailman.configuration import config
from Mailman.i18n import _
+from Mailman.queue import Switchboard
log = logging.getLogger('mailman.error')
@@ -240,7 +240,7 @@ are receiving the only remaining copy of the discarded message.
subject=_('Content filtered message notification'))
if mlist.filter_action == 3 and \
config.OWNERS_CAN_PRESERVE_FILTERED_MESSAGES:
- badq = get_switchboard(config.BADQUEUE_DIR)
+ badq = Switchboard(config.BADQUEUE_DIR)
badq.enqueue(msg, msgdata)
# Most cases also discard the message
raise Errors.DiscardMessage
diff --git a/Mailman/Handlers/ToArchive.py b/Mailman/Handlers/ToArchive.py
index 6c4397daa..c65e86f60 100644
--- a/Mailman/Handlers/ToArchive.py
+++ b/Mailman/Handlers/ToArchive.py
@@ -17,8 +17,8 @@
"""Add the message to the archives."""
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
+from Mailman.queue import Switchboard
@@ -32,6 +32,6 @@ def process(mlist, msg, msgdata):
if 'x-no-archive' in msg or msg.get('x-archive', '').lower() == 'no':
return
# Send the message to the archiver queue
- archq = get_switchboard(config.ARCHQUEUE_DIR)
+ archq = Switchboard(config.ARCHQUEUE_DIR)
# Send the message to the queue
archq.enqueue(msg, msgdata)
diff --git a/Mailman/Handlers/ToDigest.py b/Mailman/Handlers/ToDigest.py
index 131bf4588..8bd13f2e6 100644
--- a/Mailman/Handlers/ToDigest.py
+++ b/Mailman/Handlers/ToDigest.py
@@ -51,9 +51,9 @@ from Mailman.Handlers.Decorate import decorate
from Mailman.Handlers.Scrubber import process as scrubber
from Mailman.Mailbox import Mailbox
from Mailman.Mailbox import Mailbox
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
from Mailman.constants import DeliveryMode, DeliveryStatus
+from Mailman.queue import Switchboard
_ = i18n._
__i18n_templates__ = True
@@ -373,7 +373,7 @@ def send_i18n_digests(mlist, mboxfp):
# Do our final bit of housekeeping, and then send each message to the
# outgoing queue for delivery.
mlist.next_digest_number += 1
- virginq = get_switchboard(config.VIRGINQUEUE_DIR)
+ virginq = Switchboard(config.VIRGINQUEUE_DIR)
# Calculate the recipients lists
plainrecips = set()
mimerecips = set()
diff --git a/Mailman/Handlers/ToOutgoing.py b/Mailman/Handlers/ToOutgoing.py
index ff263e9a3..15ceb8311 100644
--- a/Mailman/Handlers/ToOutgoing.py
+++ b/Mailman/Handlers/ToOutgoing.py
@@ -22,8 +22,8 @@ posted to the list membership. Anything else that needs to go out to some
recipient should just be placed in the out queue directly.
"""
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
+from Mailman.queue import Switchboard
@@ -52,5 +52,5 @@ def process(mlist, msg, msgdata):
# VERP every `interval' number of times
msgdata['verp'] = not (int(mlist.post_id) % interval)
# And now drop the message in qfiles/out
- outq = get_switchboard(config.OUTQUEUE_DIR)
+ outq = Switchboard(config.OUTQUEUE_DIR)
outq.enqueue(msg, msgdata, listname=mlist.fqdn_listname)
diff --git a/Mailman/Handlers/ToUsenet.py b/Mailman/Handlers/ToUsenet.py
index 63f5da52a..09bb28bdd 100644
--- a/Mailman/Handlers/ToUsenet.py
+++ b/Mailman/Handlers/ToUsenet.py
@@ -20,7 +20,7 @@
import logging
from Mailman.configuration import config
-from Mailman.Queue.sbcache import get_switchboard
+from Mailman.queue import Switchboard
COMMASPACE = ', '
@@ -45,5 +45,5 @@ def process(mlist, msg, msgdata):
COMMASPACE.join(error))
return
# Put the message in the news runner's queue
- newsq = get_switchboard(config.NEWSQUEUE_DIR)
+ newsq = Switchboard(config.NEWSQUEUE_DIR)
newsq.enqueue(msg, msgdata, listname=mlist.fqdn_listname)
diff --git a/Mailman/MTA/Manual.py b/Mailman/MTA/Manual.py
index 43ef3afd5..953d46695 100644
--- a/Mailman/MTA/Manual.py
+++ b/Mailman/MTA/Manual.py
@@ -25,9 +25,9 @@ from cStringIO import StringIO
from Mailman import Message
from Mailman import Utils
from Mailman.MTA.Utils import makealiases
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
from Mailman.i18n import _
+from Mailman.queue import Switchboard
__i18n_templates__ = True
@@ -137,5 +137,5 @@ equivalent) file by removing the following lines, and possibly running the
_('Mailing list removal request for list $listname'),
sfp.getvalue(), config.DEFAULT_SERVER_LANGUAGE)
msg['Date'] = email.Utils.formatdate(localtime=True)
- outq = get_switchboard(config.OUTQUEUE_DIR)
+ outq = Switchboard(config.OUTQUEUE_DIR)
outq.enqueue(msg, recips=[siteowner], nodecorate=True)
diff --git a/Mailman/Message.py b/Mailman/Message.py
index 47262d8a3..e1b898ea4 100644
--- a/Mailman/Message.py
+++ b/Mailman/Message.py
@@ -244,8 +244,8 @@ class UserNotification(Message):
def _enqueue(self, mlist, **_kws):
# Not imported at module scope to avoid import loop
- from Mailman.Queue.sbcache import get_switchboard
- virginq = get_switchboard(config.VIRGINQUEUE_DIR)
+ from Mailman.queue import Switchboard
+ virginq = Switchboard(config.VIRGINQUEUE_DIR)
# The message metadata better have a 'recip' attribute.
enqueue_kws = dict(
recips=self.recips,
@@ -278,8 +278,8 @@ class OwnerNotification(UserNotification):
def _enqueue(self, mlist, **_kws):
# Not imported at module scope to avoid import loop
- from Mailman.Queue.sbcache import get_switchboard
- virginq = get_switchboard(config.VIRGINQUEUE_DIR)
+ from Mailman.queue import Switchboard
+ virginq = Switchboard(config.VIRGINQUEUE_DIR)
# The message metadata better have a `recip' attribute
virginq.enqueue(self,
listname=mlist.fqdn_listname,
diff --git a/Mailman/Post.py b/Mailman/Post.py
index 719c3e68b..50b9628a0 100644
--- a/Mailman/Post.py
+++ b/Mailman/Post.py
@@ -19,15 +19,15 @@
import sys
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
+from Mailman.queue import Switchboard
def inject(listname, msg, recips=None, qdir=None):
if qdir is None:
qdir = config.INQUEUE_DIR
- queue = get_switchboard(qdir)
+ queue = Switchboard(qdir)
kws = {'listname' : listname,
'tolist' : 1,
'_plaintext': 1,
diff --git a/Mailman/Queue/tests/__init__.py b/Mailman/Queue/tests/__init__.py
deleted file mode 100644
index e69de29bb..000000000
--- a/Mailman/Queue/tests/__init__.py
+++ /dev/null
diff --git a/Mailman/app/moderator.py b/Mailman/app/moderator.py
index 266092cdb..720a5a364 100644
--- a/Mailman/app/moderator.py
+++ b/Mailman/app/moderator.py
@@ -37,11 +37,11 @@ from Mailman import Errors
from Mailman import Message
from Mailman import Utils
from Mailman import i18n
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.app.membership import add_member, delete_member
from Mailman.configuration import config
from Mailman.constants import Action, DeliveryMode
from Mailman.interfaces import RequestType
+from Mailman.queue import Switchboard
_ = i18n._
__i18n_templates__ = True
@@ -130,7 +130,7 @@ def handle_message(mlist, id, action,
msg.get('message-id', 'n/a'))
# Stick the message back in the incoming queue for further
# processing.
- inq = get_switchboard(config.INQUEUE_DIR)
+ inq = Switchboard(config.INQUEUE_DIR)
inq.enqueue(msg, _metadata=msgdata)
else:
raise AssertionError('Unexpected action: %s' % action)
diff --git a/Mailman/bin/bounces.py b/Mailman/bin/bounces.py
index cb6ce813f..f8faad518 100644
--- a/Mailman/bin/bounces.py
+++ b/Mailman/bin/bounces.py
@@ -28,9 +28,9 @@ import logging
from Mailman import Utils
from Mailman import loginit
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
from Mailman.i18n import _
+from Mailman.queue import Switchboard
__i18n_templates__ = True
@@ -54,7 +54,7 @@ def main():
# some MTAs have a hard limit to the time a filter prog can run. Postfix
# is a good example; if the limit is hit, the proc is SIGKILL'd giving us
# no chance to save the message.
- bounceq = get_switchboard(config.BOUNCEQUEUE_DIR)
+ bounceq = Switchboard(config.BOUNCEQUEUE_DIR)
bounceq.enqueue(sys.stdin.read(), listname=listname, _plaintext=True)
diff --git a/Mailman/bin/confirm.py b/Mailman/bin/confirm.py
index 3334231a9..5b22bf71a 100644
--- a/Mailman/bin/confirm.py
+++ b/Mailman/bin/confirm.py
@@ -28,9 +28,9 @@ import logging
from Mailman import Utils
from Mailman import loginit
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
from Mailman.i18n import _
+from Mailman.queue import Switchboard
__i18n_templates__ = True
@@ -55,7 +55,7 @@ def main():
# some MTAs have a hard limit to the time a filter prog can run. Postfix
# is a good example; if the limit is hit, the proc is SIGKILL'd giving us
# no chance to save the message.
- cmdq = get_switchboard(config.CMDQUEUE_DIR)
+ cmdq = Switchboard(config.CMDQUEUE_DIR)
cmdq.enqueue(sys.stdin.read(), listname=listname,
toconfirm=True, _plaintext=True)
diff --git a/Mailman/bin/gate_news.py b/Mailman/bin/gate_news.py
index 491660f2f..bded31869 100644
--- a/Mailman/bin/gate_news.py
+++ b/Mailman/bin/gate_news.py
@@ -32,9 +32,9 @@ from Mailman import Message
from Mailman import Utils
from Mailman import Version
from Mailman import loginit
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
from Mailman.i18n import _
+from Mailman.queue import Switchboard
# Work around known problems with some RedHat cron daemons
import signal
@@ -148,7 +148,7 @@ def poll_newsgroup(mlist, conn, first, last, glock):
del msg['To']
msg['To'] = mlist.GetListEmail()
# Post the message to the locked list
- inq = get_switchboard(config.INQUEUE_DIR)
+ inq = Switchboard(config.INQUEUE_DIR)
inq.enqueue(msg,
listname=mlist.internal_name(),
fromusenet=True)
diff --git a/Mailman/bin/join.py b/Mailman/bin/join.py
index 372046c28..30813dc85 100644
--- a/Mailman/bin/join.py
+++ b/Mailman/bin/join.py
@@ -28,9 +28,9 @@ import logging
from Mailman import Utils
from Mailman import loginit
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
from Mailman.i18n import _
+from Mailman.queue import Switchboard
__i18n_templates__ = True
@@ -55,7 +55,7 @@ def main():
# some MTAs have a hard limit to the time a filter prog can run. Postfix
# is a good example; if the limit is hit, the proc is SIGKILL'd giving us
# no chance to save the message.
- cmdq = get_switchboard(config.CMDQUEUE_DIR)
+ cmdq = Switchboard(config.CMDQUEUE_DIR)
cmdq.enqueue(sys.stdin.read(), listname=listname,
tojoin=True, _plaintext=True)
diff --git a/Mailman/bin/leave.py b/Mailman/bin/leave.py
index b4b882bc9..fb6e23f68 100644
--- a/Mailman/bin/leave.py
+++ b/Mailman/bin/leave.py
@@ -28,9 +28,9 @@ import logging
from Mailman import Utils
from Mailman import loginit
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
from Mailman.i18n import _
+from Mailman.queue import Switchboard
__i18n_templates__ = True
@@ -54,7 +54,7 @@ def main():
# some MTAs have a hard limit to the time a filter prog can run. Postfix
# is a good example; if the limit is hit, the proc is SIGKILL'd giving us
# no chance to save the message.
- cmdq = get_switchboard(config.CMDQUEUE_DIR)
+ cmdq = Switchboard(config.CMDQUEUE_DIR)
cmdq.enqueue(sys.stdin.read(), listname=listname,
toleave=True, _plaintext=True)
diff --git a/Mailman/bin/owner.py b/Mailman/bin/owner.py
index 3b2bec383..183640773 100644
--- a/Mailman/bin/owner.py
+++ b/Mailman/bin/owner.py
@@ -31,9 +31,9 @@ import logging
from Mailman import Utils
from Mailman import loginit
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
from Mailman.i18n import _
+from Mailman.queue import Switchboard
__i18n_templates__ = True
@@ -57,7 +57,7 @@ def main():
# incoming queue because we need some processing done on the message. The
# processing is minimal though, so craft our own pipeline, expressly for
# the purpose of delivering to the list owners.
- inq = get_switchboard(config.INQUEUE_DIR)
+ inq = Switchboard(config.INQUEUE_DIR)
inq.enqueue(sys.stdin.read(),
listname=listname,
_plaintext=True,
diff --git a/Mailman/bin/post.py b/Mailman/bin/post.py
index 86e4f7f9b..d1c1204df 100644
--- a/Mailman/bin/post.py
+++ b/Mailman/bin/post.py
@@ -31,9 +31,9 @@ import logging
from Mailman import Utils
from Mailman import loginit
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
from Mailman.i18n import _
+from Mailman.queue import Switchboard
__i18n_templates__ = True
@@ -62,7 +62,7 @@ def main():
# some MTAs have a hard limit to the time a filter prog can run. Postfix
# is a good example; if the limit is hit, the proc is SIGKILL'd giving us
# no chance to save the message.
- inq = get_switchboard(config.INQUEUE_DIR)
+ inq = Switchboard(config.INQUEUE_DIR)
inq.enqueue(sys.stdin.read(),
listname=listname,
tolist=True, _plaintext=True)
diff --git a/Mailman/bin/qrunner.py b/Mailman/bin/qrunner.py
index 2e6c4138e..192069f39 100644
--- a/Mailman/bin/qrunner.py
+++ b/Mailman/bin/qrunner.py
@@ -29,6 +29,7 @@ from Mailman.initialize import initialize
__i18n_templates__ = True
COMMASPACE = ', '
+QRUNNER_SHORTCUTS = {}
log = None
@@ -52,11 +53,6 @@ def r_callback(option, opt, value, parser):
parser.print_help()
print >> sys.stderr, _('Bad runner specification: $value')
sys.exit(1)
- if runner == 'All':
- for runnername, slices in config.qrunners.items():
- dest.append((runnername, rslice, rrange))
- elif not runner.endswith('Runner'):
- runner += 'Runner'
dest.append((runner, rslice, rrange))
@@ -116,19 +112,25 @@ work better with that framework."""))
help=_('Alternative configuration file to use'))
opts, args = parser.parse_args()
if args:
- parser.print_help()
- print >> sys.stderr, _('Unexpected arguments')
- sys.exit(1)
+ parser.error(_('Unexpected arguments'))
if not opts.runners and not opts.list:
- parser.print_help()
- print >> sys.stderr, _('No runner name given.')
- sys.exit(1)
+ parser.error(_('No runner name given.'))
return parser, opts, args
def make_qrunner(name, slice, range, once=False):
- modulename = 'Mailman.Queue.' + name
+ # Several conventions for specifying the runner name are supported. It
+ # could be one of the shortcut names. Or the name is a full module path,
+ # use it explicitly. If the name starts with a dot, it's a class name
+ # relative to the Mailman.queue package.
+ if name in QRUNNER_SHORTCUTS:
+ classpath = QRUNNER_SHORTCUTS[name]
+ elif name.startswith('.'):
+ classpath = 'Mailman.queue' + name
+ else:
+ classpath = name
+ modulename, classname = classpath.rsplit('.', 1)
try:
__import__(modulename)
except ImportError, e:
@@ -140,7 +142,7 @@ def make_qrunner(name, slice, range, once=False):
else:
print >> sys.stderr, e
sys.exit(1)
- qrclass = getattr(sys.modules[modulename], name)
+ qrclass = getattr(sys.modules[modulename], classname)
if once:
# Subclass to hack in the setting of the stop flag in _doperiodic()
class Once(qrclass):
@@ -189,14 +191,21 @@ def main():
initialize(opts.config, propagate_logs=not opts.subproc)
log = logging.getLogger('mailman.qrunner')
+ # Calculate the qrunner shortcut names.
+ for qrunnerpath, slices in config.qrunners.items():
+ classname = qrunnerpath.rsplit('.', 1)[1]
+ if classname.endswith('Runner'):
+ shortname = classname[:-6].lower()
+ else:
+ shortname = classname
+ QRUNNER_SHORTCUTS[shortname] = qrunnerpath
+
if opts.list:
- for runnername, slices in config.qrunners.items():
- if runnername.endswith('Runner'):
- name = runnername[:-len('Runner')]
- else:
- name = runnername
- print _('$name runs the $runnername qrunner')
- print _('All runs all the above qrunners')
+ prefixlen = max(len(shortname) for shortname in QRUNNER_SHORTCUTS)
+ for shortname in sorted(QRUNNER_SHORTCUTS):
+ runnername = QRUNNER_SHORTCUTS[shortname]
+ shortname = (' ' * (prefixlen - len(shortname))) + shortname
+ print _('$shortname runs the $runnername qrunner')
sys.exit(0)
# Fast track for one infinite runner
diff --git a/Mailman/bin/request.py b/Mailman/bin/request.py
index 9bdb1f3c0..6ced2a871 100644
--- a/Mailman/bin/request.py
+++ b/Mailman/bin/request.py
@@ -28,9 +28,9 @@ import logging
from Mailman import Utils
from Mailman import loginit
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
from Mailman.i18n import _
+from Mailman.queue import Switchboard
__i18n_templates__ = True
@@ -55,7 +55,7 @@ def main():
# some MTAs have a hard limit to the time a filter prog can run. Postfix
# is a good example; if the limit is hit, the proc is SIGKILL'd giving us
# no chance to save the message.
- cmdq = get_switchboard(config.CMDQUEUE_DIR)
+ cmdq = Switchboard(config.CMDQUEUE_DIR)
cmdq.enqueue(sys.stdin.read(),
listname=listname,
torequest=True,
diff --git a/Mailman/bin/unshunt.py b/Mailman/bin/unshunt.py
index 74264fd42..a910ad7e2 100644
--- a/Mailman/bin/unshunt.py
+++ b/Mailman/bin/unshunt.py
@@ -19,9 +19,9 @@ import sys
import optparse
from Mailman import Version
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
from Mailman.i18n import _
+from Mailman.queue import Switchboard
__i18n_templates__ = True
@@ -54,13 +54,13 @@ def main():
else:
qdir = config.SHUNTQUEUE_DIR
- sb = get_switchboard(qdir)
+ sb = Switchboard(qdir)
sb.recover_backup_files()
for filebase in sb.files():
try:
msg, msgdata = sb.dequeue(filebase)
whichq = msgdata.get('whichq', config.INQUEUE_DIR)
- tosb = get_switchboard(whichq)
+ tosb = Switchboard(whichq)
tosb.enqueue(msg, msgdata)
except Exception, e:
# If there are any unshunting errors, log them and continue trying
diff --git a/Mailman/configuration.py b/Mailman/configuration.py
index bd76e5fca..b280dfce2 100644
--- a/Mailman/configuration.py
+++ b/Mailman/configuration.py
@@ -28,14 +28,14 @@ from Mailman.languages import LanguageManager
_missing = object()
DEFAULT_QRUNNERS = (
- 'Arch',
- 'Bounce',
- 'Command',
- 'Incoming',
- 'News',
- 'Outgoing',
- 'Retry',
- 'Virgin',
+ '.archive.ArchiveRunner',
+ '.bounce.BounceRunner',
+ '.command.CommandRunner',
+ '.incoming.IncomingRunner',
+ '.news.NewsRunner',
+ '.outgoing.OutgoingRunner',
+ '.retry.RetryRunner',
+ '.virgin.VirginRunner',
)
@@ -107,9 +107,9 @@ class Configuration(object):
sys.exit(-1)
# Based on values possibly set in mailman.cfg, add additional qrunners.
if ns['USE_MAILDIR']:
- self.add_qrunner('Maildir')
+ self.add_qrunner('maildir')
if ns['USE_LMTP']:
- self.add_qrunner('LMTP')
+ self.add_qrunner('lmtp')
# Pull out the defaults.
VAR_DIR = os.path.abspath(ns['VAR_DIR'])
# Now that we've loaded all the configuration files we're going to
@@ -207,7 +207,8 @@ class Configuration(object):
E.g. 'HTTP' or 'LMTP'. count is the number of qrunner slices to
create, by default, 1.
"""
- name += 'Runner'
+ if name.startswith('.'):
+ name = 'Mailman.queue' + name
self.qrunners[name] = count
def del_qrunner(self, name):
@@ -215,7 +216,8 @@ class Configuration(object):
name is the qrunner name and it must not include the 'Runner' suffix.
"""
- name += 'Runner'
+ if name.startswith('.'):
+ name = 'Mailman.queue' + name
self.qrunners.pop(name)
@property
diff --git a/Mailman/docs/acknowledge.txt b/Mailman/docs/acknowledge.txt
index 3e2088f10..88d9d6251 100644
--- a/Mailman/docs/acknowledge.txt
+++ b/Mailman/docs/acknowledge.txt
@@ -20,8 +20,8 @@ acknowledgment.
>>> # Ensure that the virgin queue is empty, since we'll be checking this
>>> # for new auto-response messages.
- >>> from Mailman.Queue.sbcache import get_switchboard
- >>> virginq = get_switchboard(config.VIRGINQUEUE_DIR)
+ >>> from Mailman.queue import Switchboard
+ >>> virginq = Switchboard(config.VIRGINQUEUE_DIR)
>>> virginq.files
[]
diff --git a/Mailman/docs/antispam.txt b/Mailman/docs/antispam.txt
index cdbda4efd..b6717c797 100644
--- a/Mailman/docs/antispam.txt
+++ b/Mailman/docs/antispam.txt
@@ -11,7 +11,7 @@ measures.
>>> from Mailman.Handlers.SpamDetect import process
>>> from Mailman.Message import Message
- >>> from Mailman.Queue.Switchboard import Switchboard
+ >>> from Mailman.queue import Switchboard
>>> from Mailman.configuration import config
>>> from Mailman.database import flush
>>> from email import message_from_string
diff --git a/Mailman/docs/archives.txt b/Mailman/docs/archives.txt
index e7c84a8ed..a02fdf802 100644
--- a/Mailman/docs/archives.txt
+++ b/Mailman/docs/archives.txt
@@ -9,7 +9,7 @@ processes.
>>> from Mailman.Handlers.ToArchive import process
>>> from Mailman.Message import Message
- >>> from Mailman.Queue.Switchboard import Switchboard
+ >>> from Mailman.queue import Switchboard
>>> from Mailman.configuration import config
>>> from Mailman.database import flush
>>> from email import message_from_string
diff --git a/Mailman/docs/bounces.txt b/Mailman/docs/bounces.txt
index cfc7aa49e..7195a082f 100644
--- a/Mailman/docs/bounces.txt
+++ b/Mailman/docs/bounces.txt
@@ -35,7 +35,7 @@ Bounce a message by passing in the original message, and an optional error
message. The bounced message ends up in the virgin queue, awaiting sending
to the original messageauthor.
- >>> from Mailman.Queue.Switchboard import Switchboard
+ >>> from Mailman.queue import Switchboard
>>> switchboard = Switchboard(config.VIRGINQUEUE_DIR)
>>> from Mailman.app.bounces import bounce_message
>>> bounce_message(mlist, msg)
diff --git a/Mailman/docs/digests.txt b/Mailman/docs/digests.txt
index 5eb2e7364..4d787221d 100644
--- a/Mailman/docs/digests.txt
+++ b/Mailman/docs/digests.txt
@@ -8,7 +8,7 @@ digests, although only two are currently supported: MIME digests and RFC 1153
>>> from Mailman.Handlers.ToDigest import process
>>> from Mailman.Message import Message
- >>> from Mailman.Queue.Switchboard import Switchboard
+ >>> from Mailman.queue import Switchboard
>>> from Mailman.configuration import config
>>> from Mailman.database import flush
>>> from email import message_from_string
diff --git a/Mailman/docs/hold.txt b/Mailman/docs/hold.txt
index 3aeb56934..f8934cf6f 100644
--- a/Mailman/docs/hold.txt
+++ b/Mailman/docs/hold.txt
@@ -8,7 +8,7 @@ are held when they meet any of a number of criteria.
>>> import os
>>> import errno
>>> from Mailman.Handlers.Hold import process
- >>> from Mailman.Queue.Switchboard import Switchboard
+ >>> from Mailman.queue import Switchboard
>>> from Mailman.configuration import config
>>> from Mailman.database import flush
>>> mlist = config.db.list_manager.create('_xtest@example.com')
diff --git a/Mailman/docs/message.txt b/Mailman/docs/message.txt
index f0f8c7739..601b871bf 100644
--- a/Mailman/docs/message.txt
+++ b/Mailman/docs/message.txt
@@ -31,7 +31,7 @@ address, an optional subject, optional body text, and optional language.
The message will end up in the virgin queue.
- >>> from Mailman.Queue.Switchboard import Switchboard
+ >>> from Mailman.queue import Switchboard
>>> switchboard = Switchboard(config.VIRGINQUEUE_DIR)
>>> len(switchboard.files)
1
diff --git a/Mailman/docs/news-runner.txt b/Mailman/docs/news-runner.txt
index 8189d218d..ecc32570f 100644
--- a/Mailman/docs/news-runner.txt
+++ b/Mailman/docs/news-runner.txt
@@ -10,7 +10,7 @@ was originally written to gate to Usenet, which has its own rules).
>>> from Mailman.Message import Message
>>> from Mailman.configuration import config
>>> from Mailman.database import flush
- >>> from Mailman.Queue.NewsRunner import prepare_message
+ >>> from Mailman.queue.news import prepare_message
>>> mlist = config.db.list_manager.create('_xtest@example.com')
>>> mlist.linked_newsgroup = 'comp.lang.python'
>>> flush()
diff --git a/Mailman/docs/nntp.txt b/Mailman/docs/nntp.txt
index e169f575a..9748a06ca 100644
--- a/Mailman/docs/nntp.txt
+++ b/Mailman/docs/nntp.txt
@@ -7,7 +7,7 @@ NNTP is to Usenet as IP is to the web, it's more general than that.
>>> from Mailman.Handlers.ToUsenet import process
>>> from Mailman.Message import Message
- >>> from Mailman.Queue.Switchboard import Switchboard
+ >>> from Mailman.queue import Switchboard
>>> from Mailman.configuration import config
>>> from Mailman.database import flush
>>> from email import message_from_string
diff --git a/Mailman/docs/outgoing.txt b/Mailman/docs/outgoing.txt
index edbde280c..c1ecffb00 100644
--- a/Mailman/docs/outgoing.txt
+++ b/Mailman/docs/outgoing.txt
@@ -11,7 +11,7 @@ headers for unambigous bounce processing.
>>> from Mailman.Handlers.ToOutgoing import process
>>> from Mailman.Message import Message
- >>> from Mailman.Queue.Switchboard import Switchboard
+ >>> from Mailman.queue import Switchboard
>>> from Mailman.configuration import config
>>> from Mailman.database import flush
>>> from email import message_from_string
diff --git a/Mailman/docs/registration.txt b/Mailman/docs/registration.txt
index a98a320af..219a0cef5 100644
--- a/Mailman/docs/registration.txt
+++ b/Mailman/docs/registration.txt
@@ -133,7 +133,7 @@ Verification by email
There is also a verification email sitting in the virgin queue now. This
message is sent to the user in order to verify the registered address.
- >>> from Mailman.Queue.Switchboard import Switchboard
+ >>> from Mailman.queue import Switchboard
>>> switchboard = Switchboard(config.VIRGINQUEUE_DIR)
>>> len(switchboard.files)
1
diff --git a/Mailman/docs/replybot.txt b/Mailman/docs/replybot.txt
index 0b1980fde..8ca131bf8 100644
--- a/Mailman/docs/replybot.txt
+++ b/Mailman/docs/replybot.txt
@@ -18,8 +18,8 @@ message or the amount of time since the last auto-response.
>>> # Ensure that the virgin queue is empty, since we'll be checking this
>>> # for new auto-response messages.
- >>> from Mailman.Queue.sbcache import get_switchboard
- >>> virginq = get_switchboard(config.VIRGINQUEUE_DIR)
+ >>> from Mailman.queue import Switchboard
+ >>> virginq = Switchboard(config.VIRGINQUEUE_DIR)
>>> virginq.files
[]
diff --git a/Mailman/docs/requests.txt b/Mailman/docs/requests.txt
index 249cab952..00128c358 100644
--- a/Mailman/docs/requests.txt
+++ b/Mailman/docs/requests.txt
@@ -19,8 +19,8 @@ Here is a helper function for printing out held requests.
And another helper for displaying messages in the virgin queue.
- >>> from Mailman.Queue.sbcache import get_switchboard
- >>> virginq = get_switchboard(config.VIRGINQUEUE_DIR)
+ >>> from Mailman.queue import Switchboard
+ >>> virginq = Switchboard(config.VIRGINQUEUE_DIR)
>>> def dequeue(whichq=None, expected_count=1):
... if whichq is None:
... whichq = virginq
@@ -317,7 +317,7 @@ indicates that the message has been approved.
>>> flush()
>>> moderator.handle_message(mlist, id_3, Action.accept)
>>> flush()
- >>> inq = get_switchboard(config.INQUEUE_DIR)
+ >>> inq = Switchboard(config.INQUEUE_DIR)
>>> qmsg, qdata = dequeue(inq)
>>> print qmsg.as_string()
From: aperson@example.org
diff --git a/Mailman/docs/runner.txt b/Mailman/docs/runner.txt
index fc5299c2a..75ec90758 100644
--- a/Mailman/docs/runner.txt
+++ b/Mailman/docs/runner.txt
@@ -17,8 +17,7 @@ continuously in a loop until the .stop() method is called.
>>> import os
>>> from email import message_from_string
>>> from Mailman.Message import Message
- >>> from Mailman.Queue.Runner import Runner
- >>> from Mailman.Queue.Switchboard import Switchboard
+ >>> from Mailman.queue import Runner, Switchboard
>>> from Mailman.configuration import config
>>> from Mailman.database import flush
>>> mlist = config.db.list_manager.create('_xtest@example.com')
diff --git a/Mailman/docs/switchboard.txt b/Mailman/docs/switchboard.txt
index 267eeffe3..ce8f277ed 100644
--- a/Mailman/docs/switchboard.txt
+++ b/Mailman/docs/switchboard.txt
@@ -6,7 +6,7 @@ instance of a switchboard is responsible for one queue directory.
>>> from email import message_from_string
>>> from Mailman.Message import Message
- >>> from Mailman.Queue.Switchboard import Switchboard
+ >>> from Mailman.queue import Switchboard
>>> msg = message_from_string("""\
... From: aperson@example.com
... To: _xtest@example.com
diff --git a/Mailman/docs/tagger.txt b/Mailman/docs/tagger.txt
index f220cdcc6..a6b2d8ca8 100644
--- a/Mailman/docs/tagger.txt
+++ b/Mailman/docs/tagger.txt
@@ -10,7 +10,7 @@ expressions. The message then gets tagged with the topic names of each hit.
>>> from Mailman.Handlers.Tagger import process
>>> from Mailman.Message import Message
- >>> from Mailman.Queue.Switchboard import Switchboard
+ >>> from Mailman.queue import Switchboard
>>> from Mailman.configuration import config
>>> from Mailman.database import flush
>>> from email import message_from_string
diff --git a/Mailman/interfaces/runner.py b/Mailman/interfaces/runner.py
new file mode 100644
index 000000000..84da0a7e3
--- /dev/null
+++ b/Mailman/interfaces/runner.py
@@ -0,0 +1,31 @@
+# Copyright (C) 2007 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
+# USA.
+
+"""Interface for queue runners."""
+
+from zope.interface import Interface, Attribute
+
+
+
+class IRunner(Interface):
+ """The queue runner."""
+
+ def run():
+ """Start the queue runner."""
+
+ def stop():
+ """Stop the queue runner on the next iteration through the loop."""
diff --git a/Mailman/Queue/Runner.py b/Mailman/queue/Runner.py
index 958e11e6c..958e11e6c 100644
--- a/Mailman/Queue/Runner.py
+++ b/Mailman/queue/Runner.py
diff --git a/Mailman/Queue/Switchboard.py b/Mailman/queue/Switchboard.py
index 2c8672f57..2c8672f57 100644
--- a/Mailman/Queue/Switchboard.py
+++ b/Mailman/queue/Switchboard.py
diff --git a/Mailman/queue/__init__.py b/Mailman/queue/__init__.py
new file mode 100644
index 000000000..94f747245
--- /dev/null
+++ b/Mailman/queue/__init__.py
@@ -0,0 +1,402 @@
+# Copyright (C) 2001-2007 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
+# USA.
+
+"""Queing and dequeuing message/metadata pickle files.
+
+Messages are represented as email.message.Message objects (or an instance ofa
+subclass). Metadata is represented as a Python dictionary. For every
+message/metadata pair in a queue, a single file containing two pickles is
+written. First, the message is written to the pickle, then the metadata
+dictionary is written.
+"""
+
+from __future__ import with_statement
+__metaclass__ = type
+
+import os
+import sha
+import time
+import email
+import errno
+import cPickle
+import logging
+import marshal
+import traceback
+
+from zope.interface import implements
+
+from Mailman import i18n
+from Mailman import Message
+from Mailman import Utils
+from Mailman.configuration import config
+from Mailman.interfaces import IRunner, ISwitchboard
+
+# 20 bytes of all bits set, maximum sha.digest() value
+shamax = 0xffffffffffffffffffffffffffffffffffffffffL
+
+# Small increment to add to time in case two entries have the same time. This
+# prevents skipping one of two entries with the same time until the next pass.
+DELTA = .0001
+
+log = logging.getLogger('mailman.error')
+
+
+
+class Switchboard:
+ implements(ISwitchboard)
+
+ def __init__(self, whichq, slice=None, numslices=1, recover=False):
+ self._whichq = whichq
+ # Create the directory if it doesn't yet exist.
+ Utils.makedirs(self._whichq, 0770)
+ # Fast track for no slices
+ self._lower = None
+ self._upper = None
+ # BAW: test performance and end-cases of this algorithm
+ if numslices <> 1:
+ self._lower = ((shamax + 1) * slice) / numslices
+ self._upper = (((shamax + 1) * (slice + 1)) / numslices) - 1
+ if recover:
+ self.recover_backup_files()
+
+ @property
+ def queue_directory(self):
+ return self._whichq
+
+ def enqueue(self, _msg, _metadata=None, **_kws):
+ if _metadata is None:
+ _metadata = {}
+ # Calculate the SHA hexdigest of the message to get a unique base
+ # filename. We're also going to use the digest as a hash into the set
+ # of parallel qrunner processes.
+ data = _metadata.copy()
+ data.update(_kws)
+ listname = data.get('listname', '--nolist--')
+ # Get some data for the input to the sha hash.
+ now = time.time()
+ if not data.get('_plaintext'):
+ protocol = 1
+ msgsave = cPickle.dumps(_msg, protocol)
+ else:
+ protocol = 0
+ msgsave = cPickle.dumps(str(_msg), protocol)
+ # listname is unicode but the input to the hash function must be an
+ # 8-bit string (eventually, a bytes object).
+ hashfood = msgsave + listname.encode('utf-8') + repr(now)
+ # Encode the current time into the file name for FIFO sorting. The
+ # file name consists of two parts separated by a '+': the received
+ # time for this message (i.e. when it first showed up on this system)
+ # and the sha hex digest.
+ rcvtime = data.setdefault('received_time', now)
+ filebase = repr(rcvtime) + '+' + sha.new(hashfood).hexdigest()
+ filename = os.path.join(self._whichq, filebase + '.pck')
+ tmpfile = filename + '.tmp'
+ # Always add the metadata schema version number
+ data['version'] = config.QFILE_SCHEMA_VERSION
+ # Filter out volatile entries
+ for k in data:
+ if k.startswith('_'):
+ del data[k]
+ # We have to tell the dequeue() method whether to parse the message
+ # object or not.
+ data['_parsemsg'] = (protocol == 0)
+ # Write to the pickle file the message object and metadata.
+ with open(tmpfile, 'w') as fp:
+ fp.write(msgsave)
+ cPickle.dump(data, fp, protocol)
+ fp.flush()
+ os.fsync(fp.fileno())
+ os.rename(tmpfile, filename)
+ return filebase
+
+ def dequeue(self, filebase):
+ # Calculate the filename from the given filebase.
+ filename = os.path.join(self._whichq, filebase + '.pck')
+ backfile = os.path.join(self._whichq, filebase + '.bak')
+ # Read the message object and metadata.
+ with open(filename) as fp:
+ # Move the file to the backup file name for processing. If this
+ # process crashes uncleanly the .bak file will be used to
+ # re-instate the .pck file in order to try again. XXX what if
+ # something caused Python to constantly crash? Is it possible
+ # that we'd end up mail bombing recipients or crushing the
+ # archiver? How would we defend against that?
+ os.rename(filename, backfile)
+ msg = cPickle.load(fp)
+ data = cPickle.load(fp)
+ if data.get('_parsemsg'):
+ msg = email.message_from_string(msg, Message.Message)
+ return msg, data
+
+ def finish(self, filebase, preserve=False):
+ bakfile = os.path.join(self._whichq, filebase + '.bak')
+ try:
+ if preserve:
+ psvfile = os.path.join(config.SHUNTQUEUE_DIR,
+ filebase + '.psv')
+ # Create the directory if it doesn't yet exist.
+ Utils.makedirs(config.SHUNTQUEUE_DIR, 0770)
+ os.rename(bakfile, psvfile)
+ else:
+ os.unlink(bakfile)
+ except EnvironmentError, e:
+ log.exception('Failed to unlink/preserve backup file: %s', bakfile)
+
+ @property
+ def files(self):
+ return self.get_files()
+
+ def get_files(self, extension='.pck'):
+ times = {}
+ lower = self._lower
+ upper = self._upper
+ for f in os.listdir(self._whichq):
+ # By ignoring anything that doesn't end in .pck, we ignore
+ # tempfiles and avoid a race condition.
+ filebase, ext = os.path.splitext(f)
+ if ext <> extension:
+ continue
+ when, digest = filebase.split('+', 1)
+ # Throw out any files which don't match our bitrange. BAW: test
+ # performance and end-cases of this algorithm. MAS: both
+ # comparisons need to be <= to get complete range.
+ if lower is None or (lower <= long(digest, 16) <= upper):
+ key = float(when)
+ while key in times:
+ key += DELTA
+ times[key] = filebase
+ # FIFO sort
+ return [times[key] for key in sorted(times)]
+
+ def recover_backup_files(self):
+ # Move all .bak files in our slice to .pck. It's impossible for both
+ # to exist at the same time, so the move is enough to ensure that our
+ # normal dequeuing process will handle them.
+ for filebase in self.get_files('.bak'):
+ src = os.path.join(self._whichq, filebase + '.bak')
+ dst = os.path.join(self._whichq, filebase + '.pck')
+ os.rename(src, dst)
+
+
+
+class Runner:
+ implements(IRunner)
+
+ QDIR = None
+ SLEEPTIME = config.QRUNNER_SLEEP_TIME
+
+ def __init__(self, slice=None, numslices=1):
+ self._kids = {}
+ # Create our own switchboard. Don't use the switchboard cache because
+ # we want to provide slice and numslice arguments.
+ self._switchboard = Switchboard(self.QDIR, slice, numslices, True)
+ # Create the shunt switchboard
+ self._shunt = Switchboard(config.SHUNTQUEUE_DIR)
+ self._stop = False
+
+ def __repr__(self):
+ return '<%s at %s>' % (self.__class__.__name__, id(self))
+
+ def stop(self):
+ self._stop = True
+
+ def run(self):
+ # Start the main loop for this queue runner.
+ try:
+ try:
+ while True:
+ # Once through the loop that processes all the files in
+ # the queue directory.
+ filecnt = self._oneloop()
+ # Do the periodic work for the subclass. BAW: this
+ # shouldn't be called here. There should be one more
+ # _doperiodic() call at the end of the _oneloop() loop.
+ self._doperiodic()
+ # If the stop flag is set, we're done.
+ if self._stop:
+ break
+ # Give the runner an opportunity to snooze for a while,
+ # but pass it the file count so it can decide whether to
+ # do more work now or not.
+ self._snooze(filecnt)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ # We've broken out of our main loop, so we want to reap all the
+ # subprocesses we've created and do any other necessary cleanups.
+ self._cleanup()
+
+ def _oneloop(self):
+ # First, list all the files in our queue directory.
+ # Switchboard.files() is guaranteed to hand us the files in FIFO
+ # order. Return an integer count of the number of files that were
+ # available for this qrunner to process.
+ files = self._switchboard.files
+ for filebase in files:
+ try:
+ # Ask the switchboard for the message and metadata objects
+ # associated with this filebase.
+ msg, msgdata = self._switchboard.dequeue(filebase)
+ except Exception, e:
+ # This used to just catch email.Errors.MessageParseError,
+ # but other problems can occur in message parsing, e.g.
+ # ValueError, and exceptions can occur in unpickling too.
+ # We don't want the runner to die, so we just log and skip
+ # this entry, but preserve it for analysis.
+ self._log(e)
+ log.error('Skipping and preserving unparseable message: %s',
+ filebase)
+ self._switchboard.finish(filebase, preserve=True)
+ continue
+ try:
+ self._onefile(msg, msgdata)
+ self._switchboard.finish(filebase)
+ except Exception, e:
+ # All runners that implement _dispose() must guarantee that
+ # exceptions are caught and dealt with properly. Still, there
+ # may be a bug in the infrastructure, and we do not want those
+ # to cause messages to be lost. Any uncaught exceptions will
+ # cause the message to be stored in the shunt queue for human
+ # intervention.
+ self._log(e)
+ # Put a marker in the metadata for unshunting
+ msgdata['whichq'] = self._switchboard.queue_directory
+ # It is possible that shunting can throw an exception, e.g. a
+ # permissions problem or a MemoryError due to a really large
+ # message. Try to be graceful.
+ try:
+ new_filebase = self._shunt.enqueue(msg, msgdata)
+ log.error('SHUNTING: %s', new_filebase)
+ self._switchboard.finish(filebase)
+ except Exception, e:
+ # The message wasn't successfully shunted. Log the
+ # exception and try to preserve the original queue entry
+ # for possible analysis.
+ self._log(e)
+ log.error('SHUNTING FAILED, preserving original entry: %s',
+ filebase)
+ self._switchboard.finish(filebase, preserve=True)
+ # Other work we want to do each time through the loop
+ Utils.reap(self._kids, once=True)
+ self._doperiodic()
+ if self._shortcircuit():
+ break
+ return len(files)
+
+ def _onefile(self, msg, msgdata):
+ # Do some common sanity checking on the message metadata. It's got to
+ # be destined for a particular mailing list. This switchboard is used
+ # to shunt off badly formatted messages. We don't want to just trash
+ # them because they may be fixable with human intervention. Just get
+ # them out of our site though.
+ #
+ # Find out which mailing list this message is destined for.
+ listname = msgdata.get('listname')
+ mlist = config.db.list_manager.get(listname)
+ if not mlist:
+ log.error('Dequeuing message destined for missing list: %s',
+ listname)
+ self._shunt.enqueue(msg, msgdata)
+ return
+ # Now process this message, keeping track of any subprocesses that may
+ # have been spawned. We'll reap those later.
+ #
+ # We also want to set up the language context for this message. The
+ # context will be the preferred language for the user if a member of
+ # the list, or the list's preferred language. However, we must take
+ # special care to reset the defaults, otherwise subsequent messages
+ # may be translated incorrectly. BAW: I'm not sure I like this
+ # approach, but I can't think of anything better right now.
+ sender = msg.get_sender()
+ member = mlist.members.get_member(sender)
+ if member:
+ lang = member.preferred_language
+ else:
+ lang = mlist.preferred_language
+ with i18n.using_language(lang):
+ msgdata['lang'] = lang
+ keepqueued = self._dispose(mlist, msg, msgdata)
+ # Keep tabs on any child processes that got spawned.
+ kids = msgdata.get('_kids')
+ if kids:
+ self._kids.update(kids)
+ if keepqueued:
+ self._switchboard.enqueue(msg, msgdata)
+
+ def _log(self, exc):
+ log.error('Uncaught runner exception: %s', exc)
+ s = StringIO()
+ traceback.print_exc(file=s)
+ log.error('%s', s.getvalue())
+
+ #
+ # Subclasses can override these methods.
+ #
+ def _cleanup(self):
+ """Clean up upon exit from the main processing loop.
+
+ Called when the Runner's main loop is stopped, this should perform
+ any necessary resource deallocation. Its return value is irrelevant.
+ """
+ Utils.reap(self._kids)
+
+ def _dispose(self, mlist, msg, msgdata):
+ """Dispose of a single message destined for a mailing list.
+
+ Called for each message that the Runner is responsible for, this is
+ the primary overridable method for processing each message.
+ Subclasses, must provide implementation for this method.
+
+ mlist is the IMailingList instance this message is destined for.
+
+ msg is the Message object representing the message.
+
+ msgdata is a dictionary of message metadata.
+ """
+ raise NotImplementedError
+
+ def _doperiodic(self):
+ """Do some processing `every once in a while'.
+
+ Called every once in a while both from the Runner's main loop, and
+ from the Runner's hash slice processing loop. You can do whatever
+ special periodic processing you want here, and the return value is
+ irrelevant.
+ """
+ pass
+
+ def _snooze(self, filecnt):
+ """Sleep for a little while.
+
+ filecnt is the number of messages in the queue the last time through.
+ Sub-runners can decide to continue to do work, or sleep for a while
+ based on this value. By default, we only snooze if there was nothing
+ to do last time around.
+ """
+ if filecnt or float(self.SLEEPTIME) <= 0:
+ return
+ time.sleep(float(self.SLEEPTIME))
+
+ def _shortcircuit(self):
+ """Return a true value if the individual file processing loop should
+ exit before it's finished processing each message in the current slice
+ of hash space. A false value tells _oneloop() to continue processing
+ until the current snapshot of hash space is exhausted.
+
+ You could, for example, implement a throttling algorithm here.
+ """
+ return self._stop
diff --git a/Mailman/Queue/ArchRunner.py b/Mailman/queue/archive.py
index 266c2ad14..c6565e11d 100644
--- a/Mailman/Queue/ArchRunner.py
+++ b/Mailman/queue/archive.py
@@ -21,12 +21,12 @@ import time
from email.Utils import parsedate_tz, mktime_tz, formatdate
from Mailman import LockFile
-from Mailman.Queue.Runner import Runner
from Mailman.configuration import config
+from Mailman.queue import Runner
-class ArchRunner(Runner):
+class ArchiveRunner(Runner):
QDIR = config.ARCHQUEUE_DIR
def _dispose(self, mlist, msg, msgdata):
diff --git a/Mailman/Queue/BounceRunner.py b/Mailman/queue/bounce.py
index 6ce7ef393..361b01fac 100644
--- a/Mailman/Queue/BounceRunner.py
+++ b/Mailman/queue/bounce.py
@@ -31,10 +31,9 @@ from Mailman import LockFile
from Mailman import Utils
from Mailman.Bouncers import BouncerAPI
from Mailman.Message import UserNotification
-from Mailman.Queue.Runner import Runner
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
from Mailman.i18n import _
+from Mailman.queue import Runner, Switchboard
COMMASPACE = ', '
@@ -164,7 +163,7 @@ class BounceRunner(Runner, BounceMixin):
def _dispose(self, mlist, msg, msgdata):
# Make sure we have the most up-to-date state
mlist.Load()
- outq = get_switchboard(config.OUTQUEUE_DIR)
+ outq = Switchboard(config.OUTQUEUE_DIR)
# There are a few possibilities here:
#
# - the message could have been VERP'd in which case, we know exactly
diff --git a/Mailman/Queue/CommandRunner.py b/Mailman/queue/command.py
index 2df28e312..a67e757f3 100644
--- a/Mailman/Queue/CommandRunner.py
+++ b/Mailman/queue/command.py
@@ -35,10 +35,10 @@ from Mailman import LockFile
from Mailman import Message
from Mailman import Utils
from Mailman.Handlers import Replybot
-from Mailman.Queue.Runner import Runner
from Mailman.app.replybot import autorespond_to_sender
from Mailman.configuration import config
from Mailman.i18n import _
+from Mailman.queue import Runner
NL = '\n'
diff --git a/Mailman/Queue/HTTPRunner.py b/Mailman/queue/http.py
index 12b5bb607..16614b1bd 100644
--- a/Mailman/Queue/HTTPRunner.py
+++ b/Mailman/queue/http.py
@@ -25,8 +25,8 @@ from cStringIO import StringIO
from wsgiref.simple_server import make_server, WSGIRequestHandler
from Mailman.Cgi.wsgi_app import mailman_app
-from Mailman.Queue.Runner import Runner
from Mailman.configuration import config
+from Mailman.queue import Runner
hlog = logging.getLogger('mailman.http')
qlog = logging.getLogger('mailman.qrunner')
diff --git a/Mailman/Queue/IncomingRunner.py b/Mailman/queue/incoming.py
index fa5b3f694..05ab924e6 100644
--- a/Mailman/Queue/IncomingRunner.py
+++ b/Mailman/queue/incoming.py
@@ -103,8 +103,8 @@ from cStringIO import StringIO
from Mailman import Errors
from Mailman import LockFile
-from Mailman.Queue.Runner import Runner
from Mailman.configuration import config
+from Mailman.queue import Runner
log = logging.getLogger('mailman.error')
vlog = logging.getLogger('mailman.vette')
diff --git a/Mailman/Queue/LMTPRunner.py b/Mailman/queue/lmtp.py
index 81c912653..0660f2e38 100644
--- a/Mailman/Queue/LMTPRunner.py
+++ b/Mailman/queue/lmtp.py
@@ -49,9 +49,8 @@ import asyncore
from email.utils import parseaddr
from Mailman.Message import Message
-from Mailman.Queue.Runner import Runner
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
+from Mailman.runner import Runner, Switchboard
elog = logging.getLogger('mailman.error')
qlog = logging.getLogger('mailman.qrunner')
@@ -147,29 +146,29 @@ class LMTPRunner(Runner, smtpd.SMTPServer):
# sub-queue, and if so, enqueue it.
msgdata = dict(listname=listname)
if subq in ('bounces', 'admin'):
- queue = get_switchboard(config.BOUNCEQUEUE_DIR)
+ queue = Switchboard(config.BOUNCEQUEUE_DIR)
elif subq == 'confirm':
msgdata['toconfirm'] = True
- queue = get_switchboard(config.CMDQUEUE_DIR)
+ queue = Switchboard(config.CMDQUEUE_DIR)
elif subq in ('join', 'subscribe'):
msgdata['tojoin'] = True
- queue = get_switchboard(config.CMDQUEUE_DIR)
+ queue = Switchboard(config.CMDQUEUE_DIR)
elif subq in ('leave', 'unsubscribe'):
msgdata['toleave'] = True
- queue = get_switchboard(config.CMDQUEUE_DIR)
+ queue = Switchboard(config.CMDQUEUE_DIR)
elif subq == 'owner':
msgdata.update({
'toowner' : True,
'envsender' : config.SITE_OWNER_ADDRESS,
'pipeline' : config.OWNER_PIPELINE,
})
- queue = get_switchboard(config.INQUEUE_DIR)
+ queue = Switchboard(config.INQUEUE_DIR)
elif subq is None:
msgdata['tolist'] = True
- queue = get_switchboard(config.INQUEUE_DIR)
+ queue = Switchboard(config.INQUEUE_DIR)
elif subq == 'request':
msgdata['torequest'] = True
- queue = get_switchboard(config.CMDQUEUE_DIR)
+ queue = Switchboard(config.CMDQUEUE_DIR)
else:
elog.error('Unknown sub-queue: %s', subq)
status.append(ERR_550)
diff --git a/Mailman/Queue/MaildirRunner.py b/Mailman/queue/maildir.py
index ff193bf22..d36adf6e9 100644
--- a/Mailman/Queue/MaildirRunner.py
+++ b/Mailman/queue/maildir.py
@@ -57,9 +57,8 @@ from email.Parser import Parser
from email.Utils import parseaddr
from Mailman.Message import Message
-from Mailman.Queue.Runner import Runner
-from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
+from Mailman.queue import Runner
log = logging.getLogger('mailman.error')
@@ -153,29 +152,29 @@ class MaildirRunner(Runner):
msgdata = {'listname': listname}
# -admin is deprecated
if subq in ('bounces', 'admin'):
- queue = get_switchboard(config.BOUNCEQUEUE_DIR)
+ queue = Switchboard(config.BOUNCEQUEUE_DIR)
elif subq == 'confirm':
msgdata['toconfirm'] = 1
- queue = get_switchboard(config.CMDQUEUE_DIR)
+ queue = Switchboard(config.CMDQUEUE_DIR)
elif subq in ('join', 'subscribe'):
msgdata['tojoin'] = 1
- queue = get_switchboard(config.CMDQUEUE_DIR)
+ queue = Switchboard(config.CMDQUEUE_DIR)
elif subq in ('leave', 'unsubscribe'):
msgdata['toleave'] = 1
- queue = get_switchboard(config.CMDQUEUE_DIR)
+ queue = Switchboard(config.CMDQUEUE_DIR)
elif subq == 'owner':
msgdata.update({
'toowner': True,
'envsender': config.SITE_OWNER_ADDRESS,
'pipeline': config.OWNER_PIPELINE,
})
- queue = get_switchboard(config.INQUEUE_DIR)
+ queue = Switchboard(config.INQUEUE_DIR)
elif subq is None:
msgdata['tolist'] = 1
- queue = get_switchboard(config.INQUEUE_DIR)
+ queue = Switchboard(config.INQUEUE_DIR)
elif subq == 'request':
msgdata['torequest'] = 1
- queue = get_switchboard(config.CMDQUEUE_DIR)
+ queue = Switchboard(config.CMDQUEUE_DIR)
else:
log.error('Unknown sub-queue: %s', subq)
os.rename(dstname, xdstname)
diff --git a/Mailman/Queue/NewsRunner.py b/Mailman/queue/news.py
index 709ec4e17..1ed58dc0c 100644
--- a/Mailman/Queue/NewsRunner.py
+++ b/Mailman/queue/news.py
@@ -29,8 +29,8 @@ from email.utils import getaddresses, make_msgid
COMMASPACE = ', '
from Mailman import Utils
-from Mailman.Queue.Runner import Runner
from Mailman.configuration import config
+from Mailman.queue import Runner
log = logging.getLogger('mailman.error')
diff --git a/Mailman/Queue/OutgoingRunner.py b/Mailman/queue/outgoing.py
index 1dd780ca4..a1e64096d 100644
--- a/Mailman/Queue/OutgoingRunner.py
+++ b/Mailman/queue/outgoing.py
@@ -28,10 +28,9 @@ import logging
from Mailman import Errors
from Mailman import LockFile
from Mailman import Message
-from Mailman.Queue.BounceRunner import BounceMixin
-from Mailman.Queue.Runner import Runner
-from Mailman.Queue.Switchboard import Switchboard
from Mailman.configuration import config
+from Mailman.queue import Runner, Switchboard
+from Mailman.queue.bounce import BounceMixin
# This controls how often _doperiodic() will try to deal with deferred
# permanent failures. It is a count of calls to _doperiodic()
diff --git a/Mailman/Queue/RetryRunner.py b/Mailman/queue/retry.py
index 33e9459b5..35accf262 100644
--- a/Mailman/Queue/RetryRunner.py
+++ b/Mailman/queue/retry.py
@@ -17,9 +17,8 @@
import time
-from Mailman.Queue.Runner import Runner
-from Mailman.Queue.Switchboard import Switchboard
from Mailman.configuration import config
+from Mailman.queue import Runner, Switchboard
diff --git a/Mailman/Queue/sbcache.py b/Mailman/queue/sbcache.py
index 8689bf083..8689bf083 100644
--- a/Mailman/Queue/sbcache.py
+++ b/Mailman/queue/sbcache.py
diff --git a/Mailman/Queue/__init__.py b/Mailman/queue/tests/__init__.py
index e69de29bb..e69de29bb 100644
--- a/Mailman/Queue/__init__.py
+++ b/Mailman/queue/tests/__init__.py
diff --git a/Mailman/Queue/VirginRunner.py b/Mailman/queue/virgin.py
index 982d98919..e26c0f190 100644
--- a/Mailman/Queue/VirginRunner.py
+++ b/Mailman/queue/virgin.py
@@ -23,9 +23,9 @@ to go through some minimal processing before they can be sent out to the
recipient.
"""
-from Mailman.Queue.IncomingRunner import IncomingRunner
-from Mailman.Queue.Runner import Runner
from Mailman.configuration import config
+from Mailman.queue import Runner
+from Mailman.queue.incoming import IncomingRunner