diff options
Diffstat (limited to 'Mailman/Queue/Runner.py')
| -rw-r--r-- | Mailman/Queue/Runner.py | 199 |
1 files changed, 112 insertions, 87 deletions
diff --git a/Mailman/Queue/Runner.py b/Mailman/Queue/Runner.py index bac76d762..23b16cfbf 100644 --- a/Mailman/Queue/Runner.py +++ b/Mailman/Queue/Runner.py @@ -1,6 +1,4 @@ -#! /usr/bin/env python -# -# Copyright (C) 1998,1999,2000 by the Free Software Foundation, Inc. +# Copyright (C) 1998,1999,2000,2001 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -19,36 +17,124 @@ """Generic queue runner class. """ -import os -import marshal import random import time +import traceback from Mailman import mm_cfg from Mailman import Utils from Mailman import Errors from Mailman import MailList -from Mailman import Message -from Mailman import LockFile + +from Mailman.pythonlib.StringIO import StringIO +from Mailman.Queue.Switchboard import Switchboard from Mailman.Logging.Syslog import syslog class Runner: - def __init__(self, qdir, cachelists=1): + def __init__(self, qdir, slice=None, numslices=1, cachelists=1): self._qdir = qdir self._kids = {} self._cachelists = cachelists - self._lock = LockFile.LockFile(os.path.join(qdir, 'qrunner.lock'), - lifetime = mm_cfg.QRUNNER_LOCK_LIFETIME) + # Create our own switchboard. Don't use the switchboard cache because + # we want to provide slice and numslice arguments. + self._switchboard = Switchboard(qdir, slice, numslices) + # Create the shunt switchboard + self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR) + self._stop = 0 + + def stop(self): + self._stop = 1 + + def run(self): + # Start the main loop for this queue runner. + try: + try: + while 1: + # Once through the loop that processes all the files in + # the queue directory. + filecnt = self.__oneloop() + # Do the periodic work for the subclass. + self._doperiodic() + # If the stop flag is set, we're done. + if self._stop: + break + # If there were no files to process, then we'll simply + # sleep for a little while and expect some to show up. + if filecnt == 0: + self._snooze() + except KeyboardInterrupt: + pass + finally: + # We've broken out of our main loop, so we want to reap all the + # subprocesses we've created and do any other necessary cleanups. + self._cleanup() - def _dequeue(self, filebase): - os.unlink(filebase + '.db') - os.unlink(filebase + '.msg') + def __oneloop(self): + # First, list all the files in our queue directory, and randomize them + # for better coverage even at resource limits. + files = self._switchboard.files() + random.shuffle(files) + for filebase in files: + # Ask the switchboard for the message and metadata objects + # associated with this filebase. + msg, msgdata = self._switchboard.dequeue(filebase) + # Now that we've dequeued the message, we want to be incredibly + # anal about making sure that no uncaught exception could cause us + # to lose the message. All runners that implement _dispose() must + # guarantee that exceptions are caught and dealt with properly. + # Still, there may be a bug in the infrastructure, and we do not + # want those to cause messages to be lost. Any uncaught + # exceptions will cause the message to be stored in the shunt + # queue for human intervention. + try: + self.__onefile(msg, msgdata) + except Exception, e: + self._log(e) + self._shunt.enqueue(msg, msgdata) + # Other work we want to do each time through the loop + Utils.reap(self._kids, once=1) + self._doperiodic() + return len(files) + def __onefile(self, msg, msgdata): + # Do some common sanity checking on the message metadata. It's got to + # be destined for a particular mailing list. This switchboard is used + # to shunt off badly formatted messages. We don't want to just trash + # them because they may be fixable with human intervention. Just get + # them out of our site though. + # + # Find out which mailing list this message is destined for. + listname = msgdata.get('listname') + if not listname: + syslog('qrunner', 'qfile metadata specifies no list: %s' % + filebase) + self._shunt.enqueue(msg, metadata) + return + mlist = self._open_list(listname) + if not mlist: + syslog('qrunner', + 'Dequeuing message destined for missing list: %s' % + filebase) + self._shunt.enqueue(msg, metadata) + return + # Now process this message, keeping track of any subprocesses that may + # have been spawned. We'll reap those later. + keepqueued = self._dispose(mlist, msg, msgdata) + kids = msgdata.get('_kids') + if kids: + self._kids.update(kids) + if keepqueued: + self._switchboard.enqueue(msg, msgdata) + _listcache = {} def _open_list(self, listname, lockp=1): + # Cache the opening of the list object given its name. The probably + # is only a moderate win because when a list is locked, all its + # attributes are re-read from the config.db file. This may help more + # when there's a real backing database. if self._cachelists: mlist = self._listcache.get(listname) else: @@ -63,87 +149,26 @@ class Runner: return None return mlist - def _start(self): - self._msgcount = 0 - self._t0 = time.time() - try: - self._lock.lock(timeout=0.5) - except LockFile.TimeOutError: - # Some other qrunner process is running, which is fine. - syslog('qrunner', 'Could not acquire %s lock' % - self.__class__) - return 0 - return 1 + def _log(self, exc): + syslog('qrunner', 'Uncaught runner exception: %s' % exc) + s = StringIO() + traceback.print_exc(file=s) + syslog('qrunner', s.getvalue()) + # + # Subclasses can override _cleanup(), _dispose(), and _doperiodic() + # def _cleanup(self): Utils.reap(self._kids) self._listcache.clear() - def _onefile(self, filebase): - msgfp = dbfp = None - try: - dbfp = open(filebase + '.db') - msgdata = marshal.load(dbfp) - dbfp.close() - dbfp = None - msgfp = open(filebase + '.msg') - # re-establish the file base for re-queuing - msg = Message.Message(msgfp, filebase=msgdata.get('filebase')) - msgfp.close() - msgfp = None - except (EOFError, ValueError, TypeError, IOError), e: - # For some reason we had trouble getting all the information out - # of the queued files. log this and move on (we figure it's a - # temporary problem) - syslog('qrunner', - 'Exception reading qfiles: %s\n%s' % (filebase, e)) - if msgfp: - msgfp.close() - if dbfp: - dbfp.close() - return - keepqueued = self._dispose_message(msg, msgdata) - # Did the delivery generate child processes? - kids = msgdata.get('_kids') - if kids: - self._kids.update(kids) - del msgdata['_kids'] - if not keepqueued: - # We're done with this message - self._dequeue(filebase) - - def _dispose_message(self, msg, msgdata): + def _dispose(self, mlist, msg, msgdata): raise UnimplementedError def _doperiodic(self): - if mm_cfg.QRUNNER_MAX_MESSAGES is not None and \ - self._msgcount > mm_cfg.QRUNNER_MAX_MESSAGES: - return 0 - if mm_cfg.QRUNNER_PROCESS_LIFETIME is not None and \ - (time.time() - self._t0) > mm_cfg.QRUNNER_PROCESS_LIFETIME: - return 0 - self._msgcount += 1 - return 1 + pass - def run(self): - # Give us the absolute path to all the unique filebase file names in - # the current directory. - files = [] - for file in os.listdir(self._qdir): - root, ext = os.path.splitext(file) - if ext == '.db': - files.append(os.path.join(self._qdir, root)) - # Randomize this list so we're more likely to touch them all - # eventually, even if we're hitting resource limits. - random.shuffle(files) - # initialize the resource counters - okaytostart = self._start() - if not okaytostart: + def _snooze(self): + if mm_cfg.QRUNNER_SLEEP_TIME <= 0: return - for filebase in files: - keepgoing = self._doperiodic() - if not keepgoing: - break - self._onefile(filebase) - # clean up after ourselves - self._cleanup() + time.sleep(mm_cfg.QRUNNER_SLEEP_TIME) |
