diff options
| -rw-r--r-- | Mailman/Handlers/MimeDel.py | 4 | ||||
| -rw-r--r-- | Mailman/Handlers/ToArchive.py | 4 | ||||
| -rw-r--r-- | Mailman/Handlers/ToDigest.py | 4 | ||||
| -rw-r--r-- | Mailman/Handlers/ToOutgoing.py | 4 | ||||
| -rw-r--r-- | Mailman/Handlers/ToUsenet.py | 4 | ||||
| -rw-r--r-- | Mailman/MTA/Manual.py | 4 | ||||
| -rw-r--r-- | Mailman/Message.py | 8 | ||||
| -rw-r--r-- | Mailman/Post.py | 4 | ||||
| -rw-r--r-- | Mailman/Queue/tests/__init__.py | 0 | ||||
| -rw-r--r-- | Mailman/app/moderator.py | 4 | ||||
| -rw-r--r-- | Mailman/bin/bounces.py | 4 | ||||
| -rw-r--r-- | Mailman/bin/confirm.py | 4 | ||||
| -rw-r--r-- | Mailman/bin/gate_news.py | 4 | ||||
| -rw-r--r-- | Mailman/bin/join.py | 4 | ||||
| -rw-r--r-- | Mailman/bin/leave.py | 4 | ||||
| -rw-r--r-- | Mailman/bin/owner.py | 4 | ||||
| -rw-r--r-- | Mailman/bin/post.py | 4 | ||||
| -rw-r--r-- | Mailman/bin/qrunner.py | 49 | ||||
| -rw-r--r-- | Mailman/bin/request.py | 4 | ||||
| -rw-r--r-- | Mailman/bin/unshunt.py | 6 | ||||
| -rw-r--r-- | Mailman/configuration.py | 26 | ||||
| -rw-r--r-- | Mailman/docs/acknowledge.txt | 4 | ||||
| -rw-r--r-- | Mailman/docs/antispam.txt | 2 | ||||
| -rw-r--r-- | Mailman/docs/archives.txt | 2 | ||||
| -rw-r--r-- | Mailman/docs/bounces.txt | 2 | ||||
| -rw-r--r-- | Mailman/docs/digests.txt | 2 | ||||
| -rw-r--r-- | Mailman/docs/hold.txt | 2 | ||||
| -rw-r--r-- | Mailman/docs/message.txt | 2 | ||||
| -rw-r--r-- | Mailman/docs/news-runner.txt | 2 | ||||
| -rw-r--r-- | Mailman/docs/nntp.txt | 2 | ||||
| -rw-r--r-- | Mailman/docs/outgoing.txt | 2 | ||||
| -rw-r--r-- | Mailman/docs/registration.txt | 2 | ||||
| -rw-r--r-- | Mailman/docs/replybot.txt | 4 | ||||
| -rw-r--r-- | Mailman/docs/requests.txt | 6 | ||||
| -rw-r--r-- | Mailman/docs/runner.txt | 3 | ||||
| -rw-r--r-- | Mailman/docs/switchboard.txt | 2 | ||||
| -rw-r--r-- | Mailman/docs/tagger.txt | 2 | ||||
| -rw-r--r-- | Mailman/interfaces/runner.py | 31 | ||||
| -rw-r--r-- | Mailman/queue/Runner.py (renamed from Mailman/Queue/Runner.py) | 0 | ||||
| -rw-r--r-- | Mailman/queue/Switchboard.py (renamed from Mailman/Queue/Switchboard.py) | 0 | ||||
| -rw-r--r-- | Mailman/queue/__init__.py | 402 | ||||
| -rw-r--r-- | Mailman/queue/archive.py (renamed from Mailman/Queue/ArchRunner.py) | 4 | ||||
| -rw-r--r-- | Mailman/queue/bounce.py (renamed from Mailman/Queue/BounceRunner.py) | 5 | ||||
| -rw-r--r-- | Mailman/queue/command.py (renamed from Mailman/Queue/CommandRunner.py) | 2 | ||||
| -rw-r--r-- | Mailman/queue/http.py (renamed from Mailman/Queue/HTTPRunner.py) | 2 | ||||
| -rw-r--r-- | Mailman/queue/incoming.py (renamed from Mailman/Queue/IncomingRunner.py) | 2 | ||||
| -rw-r--r-- | Mailman/queue/lmtp.py (renamed from Mailman/Queue/LMTPRunner.py) | 17 | ||||
| -rw-r--r-- | Mailman/queue/maildir.py (renamed from Mailman/Queue/MaildirRunner.py) | 17 | ||||
| -rw-r--r-- | Mailman/queue/news.py (renamed from Mailman/Queue/NewsRunner.py) | 2 | ||||
| -rw-r--r-- | Mailman/queue/outgoing.py (renamed from Mailman/Queue/OutgoingRunner.py) | 5 | ||||
| -rw-r--r-- | Mailman/queue/retry.py (renamed from Mailman/Queue/RetryRunner.py) | 3 | ||||
| -rw-r--r-- | Mailman/queue/sbcache.py (renamed from Mailman/Queue/sbcache.py) | 0 | ||||
| -rw-r--r-- | Mailman/queue/tests/__init__.py (renamed from Mailman/Queue/__init__.py) | 0 | ||||
| -rw-r--r-- | Mailman/queue/virgin.py (renamed from Mailman/Queue/VirginRunner.py) | 4 |
54 files changed, 564 insertions, 126 deletions
diff --git a/Mailman/Handlers/MimeDel.py b/Mailman/Handlers/MimeDel.py index 5f49f4b33..93349cb23 100644 --- a/Mailman/Handlers/MimeDel.py +++ b/Mailman/Handlers/MimeDel.py @@ -34,11 +34,11 @@ from os.path import splitext from Mailman import Errors from Mailman.Message import UserNotification -from Mailman.Queue.sbcache import get_switchboard from Mailman.Utils import oneline from Mailman.Version import VERSION from Mailman.configuration import config from Mailman.i18n import _ +from Mailman.queue import Switchboard log = logging.getLogger('mailman.error') @@ -240,7 +240,7 @@ are receiving the only remaining copy of the discarded message. subject=_('Content filtered message notification')) if mlist.filter_action == 3 and \ config.OWNERS_CAN_PRESERVE_FILTERED_MESSAGES: - badq = get_switchboard(config.BADQUEUE_DIR) + badq = Switchboard(config.BADQUEUE_DIR) badq.enqueue(msg, msgdata) # Most cases also discard the message raise Errors.DiscardMessage diff --git a/Mailman/Handlers/ToArchive.py b/Mailman/Handlers/ToArchive.py index 6c4397daa..c65e86f60 100644 --- a/Mailman/Handlers/ToArchive.py +++ b/Mailman/Handlers/ToArchive.py @@ -17,8 +17,8 @@ """Add the message to the archives.""" -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config +from Mailman.queue import Switchboard @@ -32,6 +32,6 @@ def process(mlist, msg, msgdata): if 'x-no-archive' in msg or msg.get('x-archive', '').lower() == 'no': return # Send the message to the archiver queue - archq = get_switchboard(config.ARCHQUEUE_DIR) + archq = Switchboard(config.ARCHQUEUE_DIR) # Send the message to the queue archq.enqueue(msg, msgdata) diff --git a/Mailman/Handlers/ToDigest.py b/Mailman/Handlers/ToDigest.py index 131bf4588..8bd13f2e6 100644 --- a/Mailman/Handlers/ToDigest.py +++ b/Mailman/Handlers/ToDigest.py @@ -51,9 +51,9 @@ from Mailman.Handlers.Decorate import decorate from Mailman.Handlers.Scrubber import process as scrubber from Mailman.Mailbox import Mailbox from Mailman.Mailbox import Mailbox -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config from Mailman.constants import DeliveryMode, DeliveryStatus +from Mailman.queue import Switchboard _ = i18n._ __i18n_templates__ = True @@ -373,7 +373,7 @@ def send_i18n_digests(mlist, mboxfp): # Do our final bit of housekeeping, and then send each message to the # outgoing queue for delivery. mlist.next_digest_number += 1 - virginq = get_switchboard(config.VIRGINQUEUE_DIR) + virginq = Switchboard(config.VIRGINQUEUE_DIR) # Calculate the recipients lists plainrecips = set() mimerecips = set() diff --git a/Mailman/Handlers/ToOutgoing.py b/Mailman/Handlers/ToOutgoing.py index ff263e9a3..15ceb8311 100644 --- a/Mailman/Handlers/ToOutgoing.py +++ b/Mailman/Handlers/ToOutgoing.py @@ -22,8 +22,8 @@ posted to the list membership. Anything else that needs to go out to some recipient should just be placed in the out queue directly. """ -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config +from Mailman.queue import Switchboard @@ -52,5 +52,5 @@ def process(mlist, msg, msgdata): # VERP every `interval' number of times msgdata['verp'] = not (int(mlist.post_id) % interval) # And now drop the message in qfiles/out - outq = get_switchboard(config.OUTQUEUE_DIR) + outq = Switchboard(config.OUTQUEUE_DIR) outq.enqueue(msg, msgdata, listname=mlist.fqdn_listname) diff --git a/Mailman/Handlers/ToUsenet.py b/Mailman/Handlers/ToUsenet.py index 63f5da52a..09bb28bdd 100644 --- a/Mailman/Handlers/ToUsenet.py +++ b/Mailman/Handlers/ToUsenet.py @@ -20,7 +20,7 @@ import logging from Mailman.configuration import config -from Mailman.Queue.sbcache import get_switchboard +from Mailman.queue import Switchboard COMMASPACE = ', ' @@ -45,5 +45,5 @@ def process(mlist, msg, msgdata): COMMASPACE.join(error)) return # Put the message in the news runner's queue - newsq = get_switchboard(config.NEWSQUEUE_DIR) + newsq = Switchboard(config.NEWSQUEUE_DIR) newsq.enqueue(msg, msgdata, listname=mlist.fqdn_listname) diff --git a/Mailman/MTA/Manual.py b/Mailman/MTA/Manual.py index 43ef3afd5..953d46695 100644 --- a/Mailman/MTA/Manual.py +++ b/Mailman/MTA/Manual.py @@ -25,9 +25,9 @@ from cStringIO import StringIO from Mailman import Message from Mailman import Utils from Mailman.MTA.Utils import makealiases -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config from Mailman.i18n import _ +from Mailman.queue import Switchboard __i18n_templates__ = True @@ -137,5 +137,5 @@ equivalent) file by removing the following lines, and possibly running the _('Mailing list removal request for list $listname'), sfp.getvalue(), config.DEFAULT_SERVER_LANGUAGE) msg['Date'] = email.Utils.formatdate(localtime=True) - outq = get_switchboard(config.OUTQUEUE_DIR) + outq = Switchboard(config.OUTQUEUE_DIR) outq.enqueue(msg, recips=[siteowner], nodecorate=True) diff --git a/Mailman/Message.py b/Mailman/Message.py index 47262d8a3..e1b898ea4 100644 --- a/Mailman/Message.py +++ b/Mailman/Message.py @@ -244,8 +244,8 @@ class UserNotification(Message): def _enqueue(self, mlist, **_kws): # Not imported at module scope to avoid import loop - from Mailman.Queue.sbcache import get_switchboard - virginq = get_switchboard(config.VIRGINQUEUE_DIR) + from Mailman.queue import Switchboard + virginq = Switchboard(config.VIRGINQUEUE_DIR) # The message metadata better have a 'recip' attribute. enqueue_kws = dict( recips=self.recips, @@ -278,8 +278,8 @@ class OwnerNotification(UserNotification): def _enqueue(self, mlist, **_kws): # Not imported at module scope to avoid import loop - from Mailman.Queue.sbcache import get_switchboard - virginq = get_switchboard(config.VIRGINQUEUE_DIR) + from Mailman.queue import Switchboard + virginq = Switchboard(config.VIRGINQUEUE_DIR) # The message metadata better have a `recip' attribute virginq.enqueue(self, listname=mlist.fqdn_listname, diff --git a/Mailman/Post.py b/Mailman/Post.py index 719c3e68b..50b9628a0 100644 --- a/Mailman/Post.py +++ b/Mailman/Post.py @@ -19,15 +19,15 @@ import sys -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config +from Mailman.queue import Switchboard def inject(listname, msg, recips=None, qdir=None): if qdir is None: qdir = config.INQUEUE_DIR - queue = get_switchboard(qdir) + queue = Switchboard(qdir) kws = {'listname' : listname, 'tolist' : 1, '_plaintext': 1, diff --git a/Mailman/Queue/tests/__init__.py b/Mailman/Queue/tests/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/Mailman/Queue/tests/__init__.py +++ /dev/null diff --git a/Mailman/app/moderator.py b/Mailman/app/moderator.py index 266092cdb..720a5a364 100644 --- a/Mailman/app/moderator.py +++ b/Mailman/app/moderator.py @@ -37,11 +37,11 @@ from Mailman import Errors from Mailman import Message from Mailman import Utils from Mailman import i18n -from Mailman.Queue.sbcache import get_switchboard from Mailman.app.membership import add_member, delete_member from Mailman.configuration import config from Mailman.constants import Action, DeliveryMode from Mailman.interfaces import RequestType +from Mailman.queue import Switchboard _ = i18n._ __i18n_templates__ = True @@ -130,7 +130,7 @@ def handle_message(mlist, id, action, msg.get('message-id', 'n/a')) # Stick the message back in the incoming queue for further # processing. - inq = get_switchboard(config.INQUEUE_DIR) + inq = Switchboard(config.INQUEUE_DIR) inq.enqueue(msg, _metadata=msgdata) else: raise AssertionError('Unexpected action: %s' % action) diff --git a/Mailman/bin/bounces.py b/Mailman/bin/bounces.py index cb6ce813f..f8faad518 100644 --- a/Mailman/bin/bounces.py +++ b/Mailman/bin/bounces.py @@ -28,9 +28,9 @@ import logging from Mailman import Utils from Mailman import loginit -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config from Mailman.i18n import _ +from Mailman.queue import Switchboard __i18n_templates__ = True @@ -54,7 +54,7 @@ def main(): # some MTAs have a hard limit to the time a filter prog can run. Postfix # is a good example; if the limit is hit, the proc is SIGKILL'd giving us # no chance to save the message. - bounceq = get_switchboard(config.BOUNCEQUEUE_DIR) + bounceq = Switchboard(config.BOUNCEQUEUE_DIR) bounceq.enqueue(sys.stdin.read(), listname=listname, _plaintext=True) diff --git a/Mailman/bin/confirm.py b/Mailman/bin/confirm.py index 3334231a9..5b22bf71a 100644 --- a/Mailman/bin/confirm.py +++ b/Mailman/bin/confirm.py @@ -28,9 +28,9 @@ import logging from Mailman import Utils from Mailman import loginit -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config from Mailman.i18n import _ +from Mailman.queue import Switchboard __i18n_templates__ = True @@ -55,7 +55,7 @@ def main(): # some MTAs have a hard limit to the time a filter prog can run. Postfix # is a good example; if the limit is hit, the proc is SIGKILL'd giving us # no chance to save the message. - cmdq = get_switchboard(config.CMDQUEUE_DIR) + cmdq = Switchboard(config.CMDQUEUE_DIR) cmdq.enqueue(sys.stdin.read(), listname=listname, toconfirm=True, _plaintext=True) diff --git a/Mailman/bin/gate_news.py b/Mailman/bin/gate_news.py index 491660f2f..bded31869 100644 --- a/Mailman/bin/gate_news.py +++ b/Mailman/bin/gate_news.py @@ -32,9 +32,9 @@ from Mailman import Message from Mailman import Utils from Mailman import Version from Mailman import loginit -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config from Mailman.i18n import _ +from Mailman.queue import Switchboard # Work around known problems with some RedHat cron daemons import signal @@ -148,7 +148,7 @@ def poll_newsgroup(mlist, conn, first, last, glock): del msg['To'] msg['To'] = mlist.GetListEmail() # Post the message to the locked list - inq = get_switchboard(config.INQUEUE_DIR) + inq = Switchboard(config.INQUEUE_DIR) inq.enqueue(msg, listname=mlist.internal_name(), fromusenet=True) diff --git a/Mailman/bin/join.py b/Mailman/bin/join.py index 372046c28..30813dc85 100644 --- a/Mailman/bin/join.py +++ b/Mailman/bin/join.py @@ -28,9 +28,9 @@ import logging from Mailman import Utils from Mailman import loginit -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config from Mailman.i18n import _ +from Mailman.queue import Switchboard __i18n_templates__ = True @@ -55,7 +55,7 @@ def main(): # some MTAs have a hard limit to the time a filter prog can run. Postfix # is a good example; if the limit is hit, the proc is SIGKILL'd giving us # no chance to save the message. - cmdq = get_switchboard(config.CMDQUEUE_DIR) + cmdq = Switchboard(config.CMDQUEUE_DIR) cmdq.enqueue(sys.stdin.read(), listname=listname, tojoin=True, _plaintext=True) diff --git a/Mailman/bin/leave.py b/Mailman/bin/leave.py index b4b882bc9..fb6e23f68 100644 --- a/Mailman/bin/leave.py +++ b/Mailman/bin/leave.py @@ -28,9 +28,9 @@ import logging from Mailman import Utils from Mailman import loginit -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config from Mailman.i18n import _ +from Mailman.queue import Switchboard __i18n_templates__ = True @@ -54,7 +54,7 @@ def main(): # some MTAs have a hard limit to the time a filter prog can run. Postfix # is a good example; if the limit is hit, the proc is SIGKILL'd giving us # no chance to save the message. - cmdq = get_switchboard(config.CMDQUEUE_DIR) + cmdq = Switchboard(config.CMDQUEUE_DIR) cmdq.enqueue(sys.stdin.read(), listname=listname, toleave=True, _plaintext=True) diff --git a/Mailman/bin/owner.py b/Mailman/bin/owner.py index 3b2bec383..183640773 100644 --- a/Mailman/bin/owner.py +++ b/Mailman/bin/owner.py @@ -31,9 +31,9 @@ import logging from Mailman import Utils from Mailman import loginit -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config from Mailman.i18n import _ +from Mailman.queue import Switchboard __i18n_templates__ = True @@ -57,7 +57,7 @@ def main(): # incoming queue because we need some processing done on the message. The # processing is minimal though, so craft our own pipeline, expressly for # the purpose of delivering to the list owners. - inq = get_switchboard(config.INQUEUE_DIR) + inq = Switchboard(config.INQUEUE_DIR) inq.enqueue(sys.stdin.read(), listname=listname, _plaintext=True, diff --git a/Mailman/bin/post.py b/Mailman/bin/post.py index 86e4f7f9b..d1c1204df 100644 --- a/Mailman/bin/post.py +++ b/Mailman/bin/post.py @@ -31,9 +31,9 @@ import logging from Mailman import Utils from Mailman import loginit -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config from Mailman.i18n import _ +from Mailman.queue import Switchboard __i18n_templates__ = True @@ -62,7 +62,7 @@ def main(): # some MTAs have a hard limit to the time a filter prog can run. Postfix # is a good example; if the limit is hit, the proc is SIGKILL'd giving us # no chance to save the message. - inq = get_switchboard(config.INQUEUE_DIR) + inq = Switchboard(config.INQUEUE_DIR) inq.enqueue(sys.stdin.read(), listname=listname, tolist=True, _plaintext=True) diff --git a/Mailman/bin/qrunner.py b/Mailman/bin/qrunner.py index 2e6c4138e..192069f39 100644 --- a/Mailman/bin/qrunner.py +++ b/Mailman/bin/qrunner.py @@ -29,6 +29,7 @@ from Mailman.initialize import initialize __i18n_templates__ = True COMMASPACE = ', ' +QRUNNER_SHORTCUTS = {} log = None @@ -52,11 +53,6 @@ def r_callback(option, opt, value, parser): parser.print_help() print >> sys.stderr, _('Bad runner specification: $value') sys.exit(1) - if runner == 'All': - for runnername, slices in config.qrunners.items(): - dest.append((runnername, rslice, rrange)) - elif not runner.endswith('Runner'): - runner += 'Runner' dest.append((runner, rslice, rrange)) @@ -116,19 +112,25 @@ work better with that framework.""")) help=_('Alternative configuration file to use')) opts, args = parser.parse_args() if args: - parser.print_help() - print >> sys.stderr, _('Unexpected arguments') - sys.exit(1) + parser.error(_('Unexpected arguments')) if not opts.runners and not opts.list: - parser.print_help() - print >> sys.stderr, _('No runner name given.') - sys.exit(1) + parser.error(_('No runner name given.')) return parser, opts, args def make_qrunner(name, slice, range, once=False): - modulename = 'Mailman.Queue.' + name + # Several conventions for specifying the runner name are supported. It + # could be one of the shortcut names. Or the name is a full module path, + # use it explicitly. If the name starts with a dot, it's a class name + # relative to the Mailman.queue package. + if name in QRUNNER_SHORTCUTS: + classpath = QRUNNER_SHORTCUTS[name] + elif name.startswith('.'): + classpath = 'Mailman.queue' + name + else: + classpath = name + modulename, classname = classpath.rsplit('.', 1) try: __import__(modulename) except ImportError, e: @@ -140,7 +142,7 @@ def make_qrunner(name, slice, range, once=False): else: print >> sys.stderr, e sys.exit(1) - qrclass = getattr(sys.modules[modulename], name) + qrclass = getattr(sys.modules[modulename], classname) if once: # Subclass to hack in the setting of the stop flag in _doperiodic() class Once(qrclass): @@ -189,14 +191,21 @@ def main(): initialize(opts.config, propagate_logs=not opts.subproc) log = logging.getLogger('mailman.qrunner') + # Calculate the qrunner shortcut names. + for qrunnerpath, slices in config.qrunners.items(): + classname = qrunnerpath.rsplit('.', 1)[1] + if classname.endswith('Runner'): + shortname = classname[:-6].lower() + else: + shortname = classname + QRUNNER_SHORTCUTS[shortname] = qrunnerpath + if opts.list: - for runnername, slices in config.qrunners.items(): - if runnername.endswith('Runner'): - name = runnername[:-len('Runner')] - else: - name = runnername - print _('$name runs the $runnername qrunner') - print _('All runs all the above qrunners') + prefixlen = max(len(shortname) for shortname in QRUNNER_SHORTCUTS) + for shortname in sorted(QRUNNER_SHORTCUTS): + runnername = QRUNNER_SHORTCUTS[shortname] + shortname = (' ' * (prefixlen - len(shortname))) + shortname + print _('$shortname runs the $runnername qrunner') sys.exit(0) # Fast track for one infinite runner diff --git a/Mailman/bin/request.py b/Mailman/bin/request.py index 9bdb1f3c0..6ced2a871 100644 --- a/Mailman/bin/request.py +++ b/Mailman/bin/request.py @@ -28,9 +28,9 @@ import logging from Mailman import Utils from Mailman import loginit -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config from Mailman.i18n import _ +from Mailman.queue import Switchboard __i18n_templates__ = True @@ -55,7 +55,7 @@ def main(): # some MTAs have a hard limit to the time a filter prog can run. Postfix # is a good example; if the limit is hit, the proc is SIGKILL'd giving us # no chance to save the message. - cmdq = get_switchboard(config.CMDQUEUE_DIR) + cmdq = Switchboard(config.CMDQUEUE_DIR) cmdq.enqueue(sys.stdin.read(), listname=listname, torequest=True, diff --git a/Mailman/bin/unshunt.py b/Mailman/bin/unshunt.py index 74264fd42..a910ad7e2 100644 --- a/Mailman/bin/unshunt.py +++ b/Mailman/bin/unshunt.py @@ -19,9 +19,9 @@ import sys import optparse from Mailman import Version -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config from Mailman.i18n import _ +from Mailman.queue import Switchboard __i18n_templates__ = True @@ -54,13 +54,13 @@ def main(): else: qdir = config.SHUNTQUEUE_DIR - sb = get_switchboard(qdir) + sb = Switchboard(qdir) sb.recover_backup_files() for filebase in sb.files(): try: msg, msgdata = sb.dequeue(filebase) whichq = msgdata.get('whichq', config.INQUEUE_DIR) - tosb = get_switchboard(whichq) + tosb = Switchboard(whichq) tosb.enqueue(msg, msgdata) except Exception, e: # If there are any unshunting errors, log them and continue trying diff --git a/Mailman/configuration.py b/Mailman/configuration.py index bd76e5fca..b280dfce2 100644 --- a/Mailman/configuration.py +++ b/Mailman/configuration.py @@ -28,14 +28,14 @@ from Mailman.languages import LanguageManager _missing = object() DEFAULT_QRUNNERS = ( - 'Arch', - 'Bounce', - 'Command', - 'Incoming', - 'News', - 'Outgoing', - 'Retry', - 'Virgin', + '.archive.ArchiveRunner', + '.bounce.BounceRunner', + '.command.CommandRunner', + '.incoming.IncomingRunner', + '.news.NewsRunner', + '.outgoing.OutgoingRunner', + '.retry.RetryRunner', + '.virgin.VirginRunner', ) @@ -107,9 +107,9 @@ class Configuration(object): sys.exit(-1) # Based on values possibly set in mailman.cfg, add additional qrunners. if ns['USE_MAILDIR']: - self.add_qrunner('Maildir') + self.add_qrunner('maildir') if ns['USE_LMTP']: - self.add_qrunner('LMTP') + self.add_qrunner('lmtp') # Pull out the defaults. VAR_DIR = os.path.abspath(ns['VAR_DIR']) # Now that we've loaded all the configuration files we're going to @@ -207,7 +207,8 @@ class Configuration(object): E.g. 'HTTP' or 'LMTP'. count is the number of qrunner slices to create, by default, 1. """ - name += 'Runner' + if name.startswith('.'): + name = 'Mailman.queue' + name self.qrunners[name] = count def del_qrunner(self, name): @@ -215,7 +216,8 @@ class Configuration(object): name is the qrunner name and it must not include the 'Runner' suffix. """ - name += 'Runner' + if name.startswith('.'): + name = 'Mailman.queue' + name self.qrunners.pop(name) @property diff --git a/Mailman/docs/acknowledge.txt b/Mailman/docs/acknowledge.txt index 3e2088f10..88d9d6251 100644 --- a/Mailman/docs/acknowledge.txt +++ b/Mailman/docs/acknowledge.txt @@ -20,8 +20,8 @@ acknowledgment. >>> # Ensure that the virgin queue is empty, since we'll be checking this >>> # for new auto-response messages. - >>> from Mailman.Queue.sbcache import get_switchboard - >>> virginq = get_switchboard(config.VIRGINQUEUE_DIR) + >>> from Mailman.queue import Switchboard + >>> virginq = Switchboard(config.VIRGINQUEUE_DIR) >>> virginq.files [] diff --git a/Mailman/docs/antispam.txt b/Mailman/docs/antispam.txt index cdbda4efd..b6717c797 100644 --- a/Mailman/docs/antispam.txt +++ b/Mailman/docs/antispam.txt @@ -11,7 +11,7 @@ measures. >>> from Mailman.Handlers.SpamDetect import process >>> from Mailman.Message import Message - >>> from Mailman.Queue.Switchboard import Switchboard + >>> from Mailman.queue import Switchboard >>> from Mailman.configuration import config >>> from Mailman.database import flush >>> from email import message_from_string diff --git a/Mailman/docs/archives.txt b/Mailman/docs/archives.txt index e7c84a8ed..a02fdf802 100644 --- a/Mailman/docs/archives.txt +++ b/Mailman/docs/archives.txt @@ -9,7 +9,7 @@ processes. >>> from Mailman.Handlers.ToArchive import process >>> from Mailman.Message import Message - >>> from Mailman.Queue.Switchboard import Switchboard + >>> from Mailman.queue import Switchboard >>> from Mailman.configuration import config >>> from Mailman.database import flush >>> from email import message_from_string diff --git a/Mailman/docs/bounces.txt b/Mailman/docs/bounces.txt index cfc7aa49e..7195a082f 100644 --- a/Mailman/docs/bounces.txt +++ b/Mailman/docs/bounces.txt @@ -35,7 +35,7 @@ Bounce a message by passing in the original message, and an optional error message. The bounced message ends up in the virgin queue, awaiting sending to the original messageauthor. - >>> from Mailman.Queue.Switchboard import Switchboard + >>> from Mailman.queue import Switchboard >>> switchboard = Switchboard(config.VIRGINQUEUE_DIR) >>> from Mailman.app.bounces import bounce_message >>> bounce_message(mlist, msg) diff --git a/Mailman/docs/digests.txt b/Mailman/docs/digests.txt index 5eb2e7364..4d787221d 100644 --- a/Mailman/docs/digests.txt +++ b/Mailman/docs/digests.txt @@ -8,7 +8,7 @@ digests, although only two are currently supported: MIME digests and RFC 1153 >>> from Mailman.Handlers.ToDigest import process >>> from Mailman.Message import Message - >>> from Mailman.Queue.Switchboard import Switchboard + >>> from Mailman.queue import Switchboard >>> from Mailman.configuration import config >>> from Mailman.database import flush >>> from email import message_from_string diff --git a/Mailman/docs/hold.txt b/Mailman/docs/hold.txt index 3aeb56934..f8934cf6f 100644 --- a/Mailman/docs/hold.txt +++ b/Mailman/docs/hold.txt @@ -8,7 +8,7 @@ are held when they meet any of a number of criteria. >>> import os >>> import errno >>> from Mailman.Handlers.Hold import process - >>> from Mailman.Queue.Switchboard import Switchboard + >>> from Mailman.queue import Switchboard >>> from Mailman.configuration import config >>> from Mailman.database import flush >>> mlist = config.db.list_manager.create('_xtest@example.com') diff --git a/Mailman/docs/message.txt b/Mailman/docs/message.txt index f0f8c7739..601b871bf 100644 --- a/Mailman/docs/message.txt +++ b/Mailman/docs/message.txt @@ -31,7 +31,7 @@ address, an optional subject, optional body text, and optional language. The message will end up in the virgin queue. - >>> from Mailman.Queue.Switchboard import Switchboard + >>> from Mailman.queue import Switchboard >>> switchboard = Switchboard(config.VIRGINQUEUE_DIR) >>> len(switchboard.files) 1 diff --git a/Mailman/docs/news-runner.txt b/Mailman/docs/news-runner.txt index 8189d218d..ecc32570f 100644 --- a/Mailman/docs/news-runner.txt +++ b/Mailman/docs/news-runner.txt @@ -10,7 +10,7 @@ was originally written to gate to Usenet, which has its own rules). >>> from Mailman.Message import Message >>> from Mailman.configuration import config >>> from Mailman.database import flush - >>> from Mailman.Queue.NewsRunner import prepare_message + >>> from Mailman.queue.news import prepare_message >>> mlist = config.db.list_manager.create('_xtest@example.com') >>> mlist.linked_newsgroup = 'comp.lang.python' >>> flush() diff --git a/Mailman/docs/nntp.txt b/Mailman/docs/nntp.txt index e169f575a..9748a06ca 100644 --- a/Mailman/docs/nntp.txt +++ b/Mailman/docs/nntp.txt @@ -7,7 +7,7 @@ NNTP is to Usenet as IP is to the web, it's more general than that. >>> from Mailman.Handlers.ToUsenet import process >>> from Mailman.Message import Message - >>> from Mailman.Queue.Switchboard import Switchboard + >>> from Mailman.queue import Switchboard >>> from Mailman.configuration import config >>> from Mailman.database import flush >>> from email import message_from_string diff --git a/Mailman/docs/outgoing.txt b/Mailman/docs/outgoing.txt index edbde280c..c1ecffb00 100644 --- a/Mailman/docs/outgoing.txt +++ b/Mailman/docs/outgoing.txt @@ -11,7 +11,7 @@ headers for unambigous bounce processing. >>> from Mailman.Handlers.ToOutgoing import process >>> from Mailman.Message import Message - >>> from Mailman.Queue.Switchboard import Switchboard + >>> from Mailman.queue import Switchboard >>> from Mailman.configuration import config >>> from Mailman.database import flush >>> from email import message_from_string diff --git a/Mailman/docs/registration.txt b/Mailman/docs/registration.txt index a98a320af..219a0cef5 100644 --- a/Mailman/docs/registration.txt +++ b/Mailman/docs/registration.txt @@ -133,7 +133,7 @@ Verification by email There is also a verification email sitting in the virgin queue now. This message is sent to the user in order to verify the registered address. - >>> from Mailman.Queue.Switchboard import Switchboard + >>> from Mailman.queue import Switchboard >>> switchboard = Switchboard(config.VIRGINQUEUE_DIR) >>> len(switchboard.files) 1 diff --git a/Mailman/docs/replybot.txt b/Mailman/docs/replybot.txt index 0b1980fde..8ca131bf8 100644 --- a/Mailman/docs/replybot.txt +++ b/Mailman/docs/replybot.txt @@ -18,8 +18,8 @@ message or the amount of time since the last auto-response. >>> # Ensure that the virgin queue is empty, since we'll be checking this >>> # for new auto-response messages. - >>> from Mailman.Queue.sbcache import get_switchboard - >>> virginq = get_switchboard(config.VIRGINQUEUE_DIR) + >>> from Mailman.queue import Switchboard + >>> virginq = Switchboard(config.VIRGINQUEUE_DIR) >>> virginq.files [] diff --git a/Mailman/docs/requests.txt b/Mailman/docs/requests.txt index 249cab952..00128c358 100644 --- a/Mailman/docs/requests.txt +++ b/Mailman/docs/requests.txt @@ -19,8 +19,8 @@ Here is a helper function for printing out held requests. And another helper for displaying messages in the virgin queue. - >>> from Mailman.Queue.sbcache import get_switchboard - >>> virginq = get_switchboard(config.VIRGINQUEUE_DIR) + >>> from Mailman.queue import Switchboard + >>> virginq = Switchboard(config.VIRGINQUEUE_DIR) >>> def dequeue(whichq=None, expected_count=1): ... if whichq is None: ... whichq = virginq @@ -317,7 +317,7 @@ indicates that the message has been approved. >>> flush() >>> moderator.handle_message(mlist, id_3, Action.accept) >>> flush() - >>> inq = get_switchboard(config.INQUEUE_DIR) + >>> inq = Switchboard(config.INQUEUE_DIR) >>> qmsg, qdata = dequeue(inq) >>> print qmsg.as_string() From: aperson@example.org diff --git a/Mailman/docs/runner.txt b/Mailman/docs/runner.txt index fc5299c2a..75ec90758 100644 --- a/Mailman/docs/runner.txt +++ b/Mailman/docs/runner.txt @@ -17,8 +17,7 @@ continuously in a loop until the .stop() method is called. >>> import os >>> from email import message_from_string >>> from Mailman.Message import Message - >>> from Mailman.Queue.Runner import Runner - >>> from Mailman.Queue.Switchboard import Switchboard + >>> from Mailman.queue import Runner, Switchboard >>> from Mailman.configuration import config >>> from Mailman.database import flush >>> mlist = config.db.list_manager.create('_xtest@example.com') diff --git a/Mailman/docs/switchboard.txt b/Mailman/docs/switchboard.txt index 267eeffe3..ce8f277ed 100644 --- a/Mailman/docs/switchboard.txt +++ b/Mailman/docs/switchboard.txt @@ -6,7 +6,7 @@ instance of a switchboard is responsible for one queue directory. >>> from email import message_from_string >>> from Mailman.Message import Message - >>> from Mailman.Queue.Switchboard import Switchboard + >>> from Mailman.queue import Switchboard >>> msg = message_from_string("""\ ... From: aperson@example.com ... To: _xtest@example.com diff --git a/Mailman/docs/tagger.txt b/Mailman/docs/tagger.txt index f220cdcc6..a6b2d8ca8 100644 --- a/Mailman/docs/tagger.txt +++ b/Mailman/docs/tagger.txt @@ -10,7 +10,7 @@ expressions. The message then gets tagged with the topic names of each hit. >>> from Mailman.Handlers.Tagger import process >>> from Mailman.Message import Message - >>> from Mailman.Queue.Switchboard import Switchboard + >>> from Mailman.queue import Switchboard >>> from Mailman.configuration import config >>> from Mailman.database import flush >>> from email import message_from_string diff --git a/Mailman/interfaces/runner.py b/Mailman/interfaces/runner.py new file mode 100644 index 000000000..84da0a7e3 --- /dev/null +++ b/Mailman/interfaces/runner.py @@ -0,0 +1,31 @@ +# Copyright (C) 2007 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. + +"""Interface for queue runners.""" + +from zope.interface import Interface, Attribute + + + +class IRunner(Interface): + """The queue runner.""" + + def run(): + """Start the queue runner.""" + + def stop(): + """Stop the queue runner on the next iteration through the loop.""" diff --git a/Mailman/Queue/Runner.py b/Mailman/queue/Runner.py index 958e11e6c..958e11e6c 100644 --- a/Mailman/Queue/Runner.py +++ b/Mailman/queue/Runner.py diff --git a/Mailman/Queue/Switchboard.py b/Mailman/queue/Switchboard.py index 2c8672f57..2c8672f57 100644 --- a/Mailman/Queue/Switchboard.py +++ b/Mailman/queue/Switchboard.py diff --git a/Mailman/queue/__init__.py b/Mailman/queue/__init__.py new file mode 100644 index 000000000..94f747245 --- /dev/null +++ b/Mailman/queue/__init__.py @@ -0,0 +1,402 @@ +# Copyright (C) 2001-2007 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. + +"""Queing and dequeuing message/metadata pickle files. + +Messages are represented as email.message.Message objects (or an instance ofa +subclass). Metadata is represented as a Python dictionary. For every +message/metadata pair in a queue, a single file containing two pickles is +written. First, the message is written to the pickle, then the metadata +dictionary is written. +""" + +from __future__ import with_statement +__metaclass__ = type + +import os +import sha +import time +import email +import errno +import cPickle +import logging +import marshal +import traceback + +from zope.interface import implements + +from Mailman import i18n +from Mailman import Message +from Mailman import Utils +from Mailman.configuration import config +from Mailman.interfaces import IRunner, ISwitchboard + +# 20 bytes of all bits set, maximum sha.digest() value +shamax = 0xffffffffffffffffffffffffffffffffffffffffL + +# Small increment to add to time in case two entries have the same time. This +# prevents skipping one of two entries with the same time until the next pass. +DELTA = .0001 + +log = logging.getLogger('mailman.error') + + + +class Switchboard: + implements(ISwitchboard) + + def __init__(self, whichq, slice=None, numslices=1, recover=False): + self._whichq = whichq + # Create the directory if it doesn't yet exist. + Utils.makedirs(self._whichq, 0770) + # Fast track for no slices + self._lower = None + self._upper = None + # BAW: test performance and end-cases of this algorithm + if numslices <> 1: + self._lower = ((shamax + 1) * slice) / numslices + self._upper = (((shamax + 1) * (slice + 1)) / numslices) - 1 + if recover: + self.recover_backup_files() + + @property + def queue_directory(self): + return self._whichq + + def enqueue(self, _msg, _metadata=None, **_kws): + if _metadata is None: + _metadata = {} + # Calculate the SHA hexdigest of the message to get a unique base + # filename. We're also going to use the digest as a hash into the set + # of parallel qrunner processes. + data = _metadata.copy() + data.update(_kws) + listname = data.get('listname', '--nolist--') + # Get some data for the input to the sha hash. + now = time.time() + if not data.get('_plaintext'): + protocol = 1 + msgsave = cPickle.dumps(_msg, protocol) + else: + protocol = 0 + msgsave = cPickle.dumps(str(_msg), protocol) + # listname is unicode but the input to the hash function must be an + # 8-bit string (eventually, a bytes object). + hashfood = msgsave + listname.encode('utf-8') + repr(now) + # Encode the current time into the file name for FIFO sorting. The + # file name consists of two parts separated by a '+': the received + # time for this message (i.e. when it first showed up on this system) + # and the sha hex digest. + rcvtime = data.setdefault('received_time', now) + filebase = repr(rcvtime) + '+' + sha.new(hashfood).hexdigest() + filename = os.path.join(self._whichq, filebase + '.pck') + tmpfile = filename + '.tmp' + # Always add the metadata schema version number + data['version'] = config.QFILE_SCHEMA_VERSION + # Filter out volatile entries + for k in data: + if k.startswith('_'): + del data[k] + # We have to tell the dequeue() method whether to parse the message + # object or not. + data['_parsemsg'] = (protocol == 0) + # Write to the pickle file the message object and metadata. + with open(tmpfile, 'w') as fp: + fp.write(msgsave) + cPickle.dump(data, fp, protocol) + fp.flush() + os.fsync(fp.fileno()) + os.rename(tmpfile, filename) + return filebase + + def dequeue(self, filebase): + # Calculate the filename from the given filebase. + filename = os.path.join(self._whichq, filebase + '.pck') + backfile = os.path.join(self._whichq, filebase + '.bak') + # Read the message object and metadata. + with open(filename) as fp: + # Move the file to the backup file name for processing. If this + # process crashes uncleanly the .bak file will be used to + # re-instate the .pck file in order to try again. XXX what if + # something caused Python to constantly crash? Is it possible + # that we'd end up mail bombing recipients or crushing the + # archiver? How would we defend against that? + os.rename(filename, backfile) + msg = cPickle.load(fp) + data = cPickle.load(fp) + if data.get('_parsemsg'): + msg = email.message_from_string(msg, Message.Message) + return msg, data + + def finish(self, filebase, preserve=False): + bakfile = os.path.join(self._whichq, filebase + '.bak') + try: + if preserve: + psvfile = os.path.join(config.SHUNTQUEUE_DIR, + filebase + '.psv') + # Create the directory if it doesn't yet exist. + Utils.makedirs(config.SHUNTQUEUE_DIR, 0770) + os.rename(bakfile, psvfile) + else: + os.unlink(bakfile) + except EnvironmentError, e: + log.exception('Failed to unlink/preserve backup file: %s', bakfile) + + @property + def files(self): + return self.get_files() + + def get_files(self, extension='.pck'): + times = {} + lower = self._lower + upper = self._upper + for f in os.listdir(self._whichq): + # By ignoring anything that doesn't end in .pck, we ignore + # tempfiles and avoid a race condition. + filebase, ext = os.path.splitext(f) + if ext <> extension: + continue + when, digest = filebase.split('+', 1) + # Throw out any files which don't match our bitrange. BAW: test + # performance and end-cases of this algorithm. MAS: both + # comparisons need to be <= to get complete range. + if lower is None or (lower <= long(digest, 16) <= upper): + key = float(when) + while key in times: + key += DELTA + times[key] = filebase + # FIFO sort + return [times[key] for key in sorted(times)] + + def recover_backup_files(self): + # Move all .bak files in our slice to .pck. It's impossible for both + # to exist at the same time, so the move is enough to ensure that our + # normal dequeuing process will handle them. + for filebase in self.get_files('.bak'): + src = os.path.join(self._whichq, filebase + '.bak') + dst = os.path.join(self._whichq, filebase + '.pck') + os.rename(src, dst) + + + +class Runner: + implements(IRunner) + + QDIR = None + SLEEPTIME = config.QRUNNER_SLEEP_TIME + + def __init__(self, slice=None, numslices=1): + self._kids = {} + # Create our own switchboard. Don't use the switchboard cache because + # we want to provide slice and numslice arguments. + self._switchboard = Switchboard(self.QDIR, slice, numslices, True) + # Create the shunt switchboard + self._shunt = Switchboard(config.SHUNTQUEUE_DIR) + self._stop = False + + def __repr__(self): + return '<%s at %s>' % (self.__class__.__name__, id(self)) + + def stop(self): + self._stop = True + + def run(self): + # Start the main loop for this queue runner. + try: + try: + while True: + # Once through the loop that processes all the files in + # the queue directory. + filecnt = self._oneloop() + # Do the periodic work for the subclass. BAW: this + # shouldn't be called here. There should be one more + # _doperiodic() call at the end of the _oneloop() loop. + self._doperiodic() + # If the stop flag is set, we're done. + if self._stop: + break + # Give the runner an opportunity to snooze for a while, + # but pass it the file count so it can decide whether to + # do more work now or not. + self._snooze(filecnt) + except KeyboardInterrupt: + pass + finally: + # We've broken out of our main loop, so we want to reap all the + # subprocesses we've created and do any other necessary cleanups. + self._cleanup() + + def _oneloop(self): + # First, list all the files in our queue directory. + # Switchboard.files() is guaranteed to hand us the files in FIFO + # order. Return an integer count of the number of files that were + # available for this qrunner to process. + files = self._switchboard.files + for filebase in files: + try: + # Ask the switchboard for the message and metadata objects + # associated with this filebase. + msg, msgdata = self._switchboard.dequeue(filebase) + except Exception, e: + # This used to just catch email.Errors.MessageParseError, + # but other problems can occur in message parsing, e.g. + # ValueError, and exceptions can occur in unpickling too. + # We don't want the runner to die, so we just log and skip + # this entry, but preserve it for analysis. + self._log(e) + log.error('Skipping and preserving unparseable message: %s', + filebase) + self._switchboard.finish(filebase, preserve=True) + continue + try: + self._onefile(msg, msgdata) + self._switchboard.finish(filebase) + except Exception, e: + # All runners that implement _dispose() must guarantee that + # exceptions are caught and dealt with properly. Still, there + # may be a bug in the infrastructure, and we do not want those + # to cause messages to be lost. Any uncaught exceptions will + # cause the message to be stored in the shunt queue for human + # intervention. + self._log(e) + # Put a marker in the metadata for unshunting + msgdata['whichq'] = self._switchboard.queue_directory + # It is possible that shunting can throw an exception, e.g. a + # permissions problem or a MemoryError due to a really large + # message. Try to be graceful. + try: + new_filebase = self._shunt.enqueue(msg, msgdata) + log.error('SHUNTING: %s', new_filebase) + self._switchboard.finish(filebase) + except Exception, e: + # The message wasn't successfully shunted. Log the + # exception and try to preserve the original queue entry + # for possible analysis. + self._log(e) + log.error('SHUNTING FAILED, preserving original entry: %s', + filebase) + self._switchboard.finish(filebase, preserve=True) + # Other work we want to do each time through the loop + Utils.reap(self._kids, once=True) + self._doperiodic() + if self._shortcircuit(): + break + return len(files) + + def _onefile(self, msg, msgdata): + # Do some common sanity checking on the message metadata. It's got to + # be destined for a particular mailing list. This switchboard is used + # to shunt off badly formatted messages. We don't want to just trash + # them because they may be fixable with human intervention. Just get + # them out of our site though. + # + # Find out which mailing list this message is destined for. + listname = msgdata.get('listname') + mlist = config.db.list_manager.get(listname) + if not mlist: + log.error('Dequeuing message destined for missing list: %s', + listname) + self._shunt.enqueue(msg, msgdata) + return + # Now process this message, keeping track of any subprocesses that may + # have been spawned. We'll reap those later. + # + # We also want to set up the language context for this message. The + # context will be the preferred language for the user if a member of + # the list, or the list's preferred language. However, we must take + # special care to reset the defaults, otherwise subsequent messages + # may be translated incorrectly. BAW: I'm not sure I like this + # approach, but I can't think of anything better right now. + sender = msg.get_sender() + member = mlist.members.get_member(sender) + if member: + lang = member.preferred_language + else: + lang = mlist.preferred_language + with i18n.using_language(lang): + msgdata['lang'] = lang + keepqueued = self._dispose(mlist, msg, msgdata) + # Keep tabs on any child processes that got spawned. + kids = msgdata.get('_kids') + if kids: + self._kids.update(kids) + if keepqueued: + self._switchboard.enqueue(msg, msgdata) + + def _log(self, exc): + log.error('Uncaught runner exception: %s', exc) + s = StringIO() + traceback.print_exc(file=s) + log.error('%s', s.getvalue()) + + # + # Subclasses can override these methods. + # + def _cleanup(self): + """Clean up upon exit from the main processing loop. + + Called when the Runner's main loop is stopped, this should perform + any necessary resource deallocation. Its return value is irrelevant. + """ + Utils.reap(self._kids) + + def _dispose(self, mlist, msg, msgdata): + """Dispose of a single message destined for a mailing list. + + Called for each message that the Runner is responsible for, this is + the primary overridable method for processing each message. + Subclasses, must provide implementation for this method. + + mlist is the IMailingList instance this message is destined for. + + msg is the Message object representing the message. + + msgdata is a dictionary of message metadata. + """ + raise NotImplementedError + + def _doperiodic(self): + """Do some processing `every once in a while'. + + Called every once in a while both from the Runner's main loop, and + from the Runner's hash slice processing loop. You can do whatever + special periodic processing you want here, and the return value is + irrelevant. + """ + pass + + def _snooze(self, filecnt): + """Sleep for a little while. + + filecnt is the number of messages in the queue the last time through. + Sub-runners can decide to continue to do work, or sleep for a while + based on this value. By default, we only snooze if there was nothing + to do last time around. + """ + if filecnt or float(self.SLEEPTIME) <= 0: + return + time.sleep(float(self.SLEEPTIME)) + + def _shortcircuit(self): + """Return a true value if the individual file processing loop should + exit before it's finished processing each message in the current slice + of hash space. A false value tells _oneloop() to continue processing + until the current snapshot of hash space is exhausted. + + You could, for example, implement a throttling algorithm here. + """ + return self._stop diff --git a/Mailman/Queue/ArchRunner.py b/Mailman/queue/archive.py index 266c2ad14..c6565e11d 100644 --- a/Mailman/Queue/ArchRunner.py +++ b/Mailman/queue/archive.py @@ -21,12 +21,12 @@ import time from email.Utils import parsedate_tz, mktime_tz, formatdate from Mailman import LockFile -from Mailman.Queue.Runner import Runner from Mailman.configuration import config +from Mailman.queue import Runner -class ArchRunner(Runner): +class ArchiveRunner(Runner): QDIR = config.ARCHQUEUE_DIR def _dispose(self, mlist, msg, msgdata): diff --git a/Mailman/Queue/BounceRunner.py b/Mailman/queue/bounce.py index 6ce7ef393..361b01fac 100644 --- a/Mailman/Queue/BounceRunner.py +++ b/Mailman/queue/bounce.py @@ -31,10 +31,9 @@ from Mailman import LockFile from Mailman import Utils from Mailman.Bouncers import BouncerAPI from Mailman.Message import UserNotification -from Mailman.Queue.Runner import Runner -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config from Mailman.i18n import _ +from Mailman.queue import Runner, Switchboard COMMASPACE = ', ' @@ -164,7 +163,7 @@ class BounceRunner(Runner, BounceMixin): def _dispose(self, mlist, msg, msgdata): # Make sure we have the most up-to-date state mlist.Load() - outq = get_switchboard(config.OUTQUEUE_DIR) + outq = Switchboard(config.OUTQUEUE_DIR) # There are a few possibilities here: # # - the message could have been VERP'd in which case, we know exactly diff --git a/Mailman/Queue/CommandRunner.py b/Mailman/queue/command.py index 2df28e312..a67e757f3 100644 --- a/Mailman/Queue/CommandRunner.py +++ b/Mailman/queue/command.py @@ -35,10 +35,10 @@ from Mailman import LockFile from Mailman import Message from Mailman import Utils from Mailman.Handlers import Replybot -from Mailman.Queue.Runner import Runner from Mailman.app.replybot import autorespond_to_sender from Mailman.configuration import config from Mailman.i18n import _ +from Mailman.queue import Runner NL = '\n' diff --git a/Mailman/Queue/HTTPRunner.py b/Mailman/queue/http.py index 12b5bb607..16614b1bd 100644 --- a/Mailman/Queue/HTTPRunner.py +++ b/Mailman/queue/http.py @@ -25,8 +25,8 @@ from cStringIO import StringIO from wsgiref.simple_server import make_server, WSGIRequestHandler from Mailman.Cgi.wsgi_app import mailman_app -from Mailman.Queue.Runner import Runner from Mailman.configuration import config +from Mailman.queue import Runner hlog = logging.getLogger('mailman.http') qlog = logging.getLogger('mailman.qrunner') diff --git a/Mailman/Queue/IncomingRunner.py b/Mailman/queue/incoming.py index fa5b3f694..05ab924e6 100644 --- a/Mailman/Queue/IncomingRunner.py +++ b/Mailman/queue/incoming.py @@ -103,8 +103,8 @@ from cStringIO import StringIO from Mailman import Errors from Mailman import LockFile -from Mailman.Queue.Runner import Runner from Mailman.configuration import config +from Mailman.queue import Runner log = logging.getLogger('mailman.error') vlog = logging.getLogger('mailman.vette') diff --git a/Mailman/Queue/LMTPRunner.py b/Mailman/queue/lmtp.py index 81c912653..0660f2e38 100644 --- a/Mailman/Queue/LMTPRunner.py +++ b/Mailman/queue/lmtp.py @@ -49,9 +49,8 @@ import asyncore from email.utils import parseaddr from Mailman.Message import Message -from Mailman.Queue.Runner import Runner -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config +from Mailman.runner import Runner, Switchboard elog = logging.getLogger('mailman.error') qlog = logging.getLogger('mailman.qrunner') @@ -147,29 +146,29 @@ class LMTPRunner(Runner, smtpd.SMTPServer): # sub-queue, and if so, enqueue it. msgdata = dict(listname=listname) if subq in ('bounces', 'admin'): - queue = get_switchboard(config.BOUNCEQUEUE_DIR) + queue = Switchboard(config.BOUNCEQUEUE_DIR) elif subq == 'confirm': msgdata['toconfirm'] = True - queue = get_switchboard(config.CMDQUEUE_DIR) + queue = Switchboard(config.CMDQUEUE_DIR) elif subq in ('join', 'subscribe'): msgdata['tojoin'] = True - queue = get_switchboard(config.CMDQUEUE_DIR) + queue = Switchboard(config.CMDQUEUE_DIR) elif subq in ('leave', 'unsubscribe'): msgdata['toleave'] = True - queue = get_switchboard(config.CMDQUEUE_DIR) + queue = Switchboard(config.CMDQUEUE_DIR) elif subq == 'owner': msgdata.update({ 'toowner' : True, 'envsender' : config.SITE_OWNER_ADDRESS, 'pipeline' : config.OWNER_PIPELINE, }) - queue = get_switchboard(config.INQUEUE_DIR) + queue = Switchboard(config.INQUEUE_DIR) elif subq is None: msgdata['tolist'] = True - queue = get_switchboard(config.INQUEUE_DIR) + queue = Switchboard(config.INQUEUE_DIR) elif subq == 'request': msgdata['torequest'] = True - queue = get_switchboard(config.CMDQUEUE_DIR) + queue = Switchboard(config.CMDQUEUE_DIR) else: elog.error('Unknown sub-queue: %s', subq) status.append(ERR_550) diff --git a/Mailman/Queue/MaildirRunner.py b/Mailman/queue/maildir.py index ff193bf22..d36adf6e9 100644 --- a/Mailman/Queue/MaildirRunner.py +++ b/Mailman/queue/maildir.py @@ -57,9 +57,8 @@ from email.Parser import Parser from email.Utils import parseaddr from Mailman.Message import Message -from Mailman.Queue.Runner import Runner -from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config +from Mailman.queue import Runner log = logging.getLogger('mailman.error') @@ -153,29 +152,29 @@ class MaildirRunner(Runner): msgdata = {'listname': listname} # -admin is deprecated if subq in ('bounces', 'admin'): - queue = get_switchboard(config.BOUNCEQUEUE_DIR) + queue = Switchboard(config.BOUNCEQUEUE_DIR) elif subq == 'confirm': msgdata['toconfirm'] = 1 - queue = get_switchboard(config.CMDQUEUE_DIR) + queue = Switchboard(config.CMDQUEUE_DIR) elif subq in ('join', 'subscribe'): msgdata['tojoin'] = 1 - queue = get_switchboard(config.CMDQUEUE_DIR) + queue = Switchboard(config.CMDQUEUE_DIR) elif subq in ('leave', 'unsubscribe'): msgdata['toleave'] = 1 - queue = get_switchboard(config.CMDQUEUE_DIR) + queue = Switchboard(config.CMDQUEUE_DIR) elif subq == 'owner': msgdata.update({ 'toowner': True, 'envsender': config.SITE_OWNER_ADDRESS, 'pipeline': config.OWNER_PIPELINE, }) - queue = get_switchboard(config.INQUEUE_DIR) + queue = Switchboard(config.INQUEUE_DIR) elif subq is None: msgdata['tolist'] = 1 - queue = get_switchboard(config.INQUEUE_DIR) + queue = Switchboard(config.INQUEUE_DIR) elif subq == 'request': msgdata['torequest'] = 1 - queue = get_switchboard(config.CMDQUEUE_DIR) + queue = Switchboard(config.CMDQUEUE_DIR) else: log.error('Unknown sub-queue: %s', subq) os.rename(dstname, xdstname) diff --git a/Mailman/Queue/NewsRunner.py b/Mailman/queue/news.py index 709ec4e17..1ed58dc0c 100644 --- a/Mailman/Queue/NewsRunner.py +++ b/Mailman/queue/news.py @@ -29,8 +29,8 @@ from email.utils import getaddresses, make_msgid COMMASPACE = ', ' from Mailman import Utils -from Mailman.Queue.Runner import Runner from Mailman.configuration import config +from Mailman.queue import Runner log = logging.getLogger('mailman.error') diff --git a/Mailman/Queue/OutgoingRunner.py b/Mailman/queue/outgoing.py index 1dd780ca4..a1e64096d 100644 --- a/Mailman/Queue/OutgoingRunner.py +++ b/Mailman/queue/outgoing.py @@ -28,10 +28,9 @@ import logging from Mailman import Errors from Mailman import LockFile from Mailman import Message -from Mailman.Queue.BounceRunner import BounceMixin -from Mailman.Queue.Runner import Runner -from Mailman.Queue.Switchboard import Switchboard from Mailman.configuration import config +from Mailman.queue import Runner, Switchboard +from Mailman.queue.bounce import BounceMixin # This controls how often _doperiodic() will try to deal with deferred # permanent failures. It is a count of calls to _doperiodic() diff --git a/Mailman/Queue/RetryRunner.py b/Mailman/queue/retry.py index 33e9459b5..35accf262 100644 --- a/Mailman/Queue/RetryRunner.py +++ b/Mailman/queue/retry.py @@ -17,9 +17,8 @@ import time -from Mailman.Queue.Runner import Runner -from Mailman.Queue.Switchboard import Switchboard from Mailman.configuration import config +from Mailman.queue import Runner, Switchboard diff --git a/Mailman/Queue/sbcache.py b/Mailman/queue/sbcache.py index 8689bf083..8689bf083 100644 --- a/Mailman/Queue/sbcache.py +++ b/Mailman/queue/sbcache.py diff --git a/Mailman/Queue/__init__.py b/Mailman/queue/tests/__init__.py index e69de29bb..e69de29bb 100644 --- a/Mailman/Queue/__init__.py +++ b/Mailman/queue/tests/__init__.py diff --git a/Mailman/Queue/VirginRunner.py b/Mailman/queue/virgin.py index 982d98919..e26c0f190 100644 --- a/Mailman/Queue/VirginRunner.py +++ b/Mailman/queue/virgin.py @@ -23,9 +23,9 @@ to go through some minimal processing before they can be sent out to the recipient. """ -from Mailman.Queue.IncomingRunner import IncomingRunner -from Mailman.Queue.Runner import Runner from Mailman.configuration import config +from Mailman.queue import Runner +from Mailman.queue.incoming import IncomingRunner |
