summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Mailman/Queue/Runner.py199
1 files changed, 112 insertions, 87 deletions
diff --git a/Mailman/Queue/Runner.py b/Mailman/Queue/Runner.py
index bac76d762..23b16cfbf 100644
--- a/Mailman/Queue/Runner.py
+++ b/Mailman/Queue/Runner.py
@@ -1,6 +1,4 @@
-#! /usr/bin/env python
-#
-# Copyright (C) 1998,1999,2000 by the Free Software Foundation, Inc.
+# Copyright (C) 1998,1999,2000,2001 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -19,36 +17,124 @@
"""Generic queue runner class.
"""
-import os
-import marshal
import random
import time
+import traceback
from Mailman import mm_cfg
from Mailman import Utils
from Mailman import Errors
from Mailman import MailList
-from Mailman import Message
-from Mailman import LockFile
+
+from Mailman.pythonlib.StringIO import StringIO
+from Mailman.Queue.Switchboard import Switchboard
from Mailman.Logging.Syslog import syslog
class Runner:
- def __init__(self, qdir, cachelists=1):
+ def __init__(self, qdir, slice=None, numslices=1, cachelists=1):
self._qdir = qdir
self._kids = {}
self._cachelists = cachelists
- self._lock = LockFile.LockFile(os.path.join(qdir, 'qrunner.lock'),
- lifetime = mm_cfg.QRUNNER_LOCK_LIFETIME)
+ # Create our own switchboard. Don't use the switchboard cache because
+ # we want to provide slice and numslice arguments.
+ self._switchboard = Switchboard(qdir, slice, numslices)
+ # Create the shunt switchboard
+ self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR)
+ self._stop = 0
+
+ def stop(self):
+ self._stop = 1
+
+ def run(self):
+ # Start the main loop for this queue runner.
+ try:
+ try:
+ while 1:
+ # Once through the loop that processes all the files in
+ # the queue directory.
+ filecnt = self.__oneloop()
+ # Do the periodic work for the subclass.
+ self._doperiodic()
+ # If the stop flag is set, we're done.
+ if self._stop:
+ break
+ # If there were no files to process, then we'll simply
+ # sleep for a little while and expect some to show up.
+ if filecnt == 0:
+ self._snooze()
+ except KeyboardInterrupt:
+ pass
+ finally:
+ # We've broken out of our main loop, so we want to reap all the
+ # subprocesses we've created and do any other necessary cleanups.
+ self._cleanup()
- def _dequeue(self, filebase):
- os.unlink(filebase + '.db')
- os.unlink(filebase + '.msg')
+ def __oneloop(self):
+ # First, list all the files in our queue directory, and randomize them
+ # for better coverage even at resource limits.
+ files = self._switchboard.files()
+ random.shuffle(files)
+ for filebase in files:
+ # Ask the switchboard for the message and metadata objects
+ # associated with this filebase.
+ msg, msgdata = self._switchboard.dequeue(filebase)
+ # Now that we've dequeued the message, we want to be incredibly
+ # anal about making sure that no uncaught exception could cause us
+ # to lose the message. All runners that implement _dispose() must
+ # guarantee that exceptions are caught and dealt with properly.
+ # Still, there may be a bug in the infrastructure, and we do not
+ # want those to cause messages to be lost. Any uncaught
+ # exceptions will cause the message to be stored in the shunt
+ # queue for human intervention.
+ try:
+ self.__onefile(msg, msgdata)
+ except Exception, e:
+ self._log(e)
+ self._shunt.enqueue(msg, msgdata)
+ # Other work we want to do each time through the loop
+ Utils.reap(self._kids, once=1)
+ self._doperiodic()
+ return len(files)
+ def __onefile(self, msg, msgdata):
+ # Do some common sanity checking on the message metadata. It's got to
+ # be destined for a particular mailing list. This switchboard is used
+ # to shunt off badly formatted messages. We don't want to just trash
+ # them because they may be fixable with human intervention. Just get
+ # them out of our site though.
+ #
+ # Find out which mailing list this message is destined for.
+ listname = msgdata.get('listname')
+ if not listname:
+ syslog('qrunner', 'qfile metadata specifies no list: %s' %
+ filebase)
+ self._shunt.enqueue(msg, metadata)
+ return
+ mlist = self._open_list(listname)
+ if not mlist:
+ syslog('qrunner',
+ 'Dequeuing message destined for missing list: %s' %
+ filebase)
+ self._shunt.enqueue(msg, metadata)
+ return
+ # Now process this message, keeping track of any subprocesses that may
+ # have been spawned. We'll reap those later.
+ keepqueued = self._dispose(mlist, msg, msgdata)
+ kids = msgdata.get('_kids')
+ if kids:
+ self._kids.update(kids)
+ if keepqueued:
+ self._switchboard.enqueue(msg, msgdata)
+
_listcache = {}
def _open_list(self, listname, lockp=1):
+ # Cache the opening of the list object given its name. The probably
+ # is only a moderate win because when a list is locked, all its
+ # attributes are re-read from the config.db file. This may help more
+ # when there's a real backing database.
if self._cachelists:
mlist = self._listcache.get(listname)
else:
@@ -63,87 +149,26 @@ class Runner:
return None
return mlist
- def _start(self):
- self._msgcount = 0
- self._t0 = time.time()
- try:
- self._lock.lock(timeout=0.5)
- except LockFile.TimeOutError:
- # Some other qrunner process is running, which is fine.
- syslog('qrunner', 'Could not acquire %s lock' %
- self.__class__)
- return 0
- return 1
+ def _log(self, exc):
+ syslog('qrunner', 'Uncaught runner exception: %s' % exc)
+ s = StringIO()
+ traceback.print_exc(file=s)
+ syslog('qrunner', s.getvalue())
+ #
+ # Subclasses can override _cleanup(), _dispose(), and _doperiodic()
+ #
def _cleanup(self):
Utils.reap(self._kids)
self._listcache.clear()
- def _onefile(self, filebase):
- msgfp = dbfp = None
- try:
- dbfp = open(filebase + '.db')
- msgdata = marshal.load(dbfp)
- dbfp.close()
- dbfp = None
- msgfp = open(filebase + '.msg')
- # re-establish the file base for re-queuing
- msg = Message.Message(msgfp, filebase=msgdata.get('filebase'))
- msgfp.close()
- msgfp = None
- except (EOFError, ValueError, TypeError, IOError), e:
- # For some reason we had trouble getting all the information out
- # of the queued files. log this and move on (we figure it's a
- # temporary problem)
- syslog('qrunner',
- 'Exception reading qfiles: %s\n%s' % (filebase, e))
- if msgfp:
- msgfp.close()
- if dbfp:
- dbfp.close()
- return
- keepqueued = self._dispose_message(msg, msgdata)
- # Did the delivery generate child processes?
- kids = msgdata.get('_kids')
- if kids:
- self._kids.update(kids)
- del msgdata['_kids']
- if not keepqueued:
- # We're done with this message
- self._dequeue(filebase)
-
- def _dispose_message(self, msg, msgdata):
+ def _dispose(self, mlist, msg, msgdata):
raise UnimplementedError
def _doperiodic(self):
- if mm_cfg.QRUNNER_MAX_MESSAGES is not None and \
- self._msgcount > mm_cfg.QRUNNER_MAX_MESSAGES:
- return 0
- if mm_cfg.QRUNNER_PROCESS_LIFETIME is not None and \
- (time.time() - self._t0) > mm_cfg.QRUNNER_PROCESS_LIFETIME:
- return 0
- self._msgcount += 1
- return 1
+ pass
- def run(self):
- # Give us the absolute path to all the unique filebase file names in
- # the current directory.
- files = []
- for file in os.listdir(self._qdir):
- root, ext = os.path.splitext(file)
- if ext == '.db':
- files.append(os.path.join(self._qdir, root))
- # Randomize this list so we're more likely to touch them all
- # eventually, even if we're hitting resource limits.
- random.shuffle(files)
- # initialize the resource counters
- okaytostart = self._start()
- if not okaytostart:
+ def _snooze(self):
+ if mm_cfg.QRUNNER_SLEEP_TIME <= 0:
return
- for filebase in files:
- keepgoing = self._doperiodic()
- if not keepgoing:
- break
- self._onefile(filebase)
- # clean up after ourselves
- self._cleanup()
+ time.sleep(mm_cfg.QRUNNER_SLEEP_TIME)