diff options
Diffstat (limited to 'Mailman/queue/Runner.py')
| -rw-r--r-- | Mailman/queue/Runner.py | 244 |
1 files changed, 244 insertions, 0 deletions
diff --git a/Mailman/queue/Runner.py b/Mailman/queue/Runner.py new file mode 100644 index 000000000..958e11e6c --- /dev/null +++ b/Mailman/queue/Runner.py @@ -0,0 +1,244 @@ +# Copyright (C) 1998-2007 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. + +"""Generic queue runner class.""" + +from __future__ import with_statement + +import time +import weakref +import logging +import traceback +import email.Errors + +from cStringIO import StringIO + +from Mailman import Errors +from Mailman import Utils +from Mailman import i18n +from Mailman.Queue.Switchboard import Switchboard +from Mailman.configuration import config + +log = logging.getLogger('mailman.error') + + + +class Runner: + QDIR = None + SLEEPTIME = config.QRUNNER_SLEEP_TIME + + def __init__(self, slice=None, numslices=1): + self._kids = {} + # Create our own switchboard. Don't use the switchboard cache because + # we want to provide slice and numslice arguments. + self._switchboard = Switchboard(self.QDIR, slice, numslices, True) + # Create the shunt switchboard + self._shunt = Switchboard(config.SHUNTQUEUE_DIR) + self._stop = False + + def __repr__(self): + return '<%s at %s>' % (self.__class__.__name__, id(self)) + + def stop(self): + self._stop = True + + def run(self): + # Start the main loop for this queue runner. + try: + try: + while True: + # Once through the loop that processes all the files in + # the queue directory. + filecnt = self._oneloop() + # Do the periodic work for the subclass. BAW: this + # shouldn't be called here. There should be one more + # _doperiodic() call at the end of the _oneloop() loop. + self._doperiodic() + # If the stop flag is set, we're done. + if self._stop: + break + # Give the runner an opportunity to snooze for a while, + # but pass it the file count so it can decide whether to + # do more work now or not. + self._snooze(filecnt) + except KeyboardInterrupt: + pass + finally: + # We've broken out of our main loop, so we want to reap all the + # subprocesses we've created and do any other necessary cleanups. + self._cleanup() + + def _oneloop(self): + # First, list all the files in our queue directory. + # Switchboard.files() is guaranteed to hand us the files in FIFO + # order. Return an integer count of the number of files that were + # available for this qrunner to process. + files = self._switchboard.files + for filebase in files: + try: + # Ask the switchboard for the message and metadata objects + # associated with this filebase. + msg, msgdata = self._switchboard.dequeue(filebase) + except Exception, e: + # This used to just catch email.Errors.MessageParseError, + # but other problems can occur in message parsing, e.g. + # ValueError, and exceptions can occur in unpickling too. + # We don't want the runner to die, so we just log and skip + # this entry, but preserve it for analysis. + self._log(e) + log.error('Skipping and preserving unparseable message: %s', + filebase) + self._switchboard.finish(filebase, preserve=True) + continue + try: + self._onefile(msg, msgdata) + self._switchboard.finish(filebase) + except Exception, e: + # All runners that implement _dispose() must guarantee that + # exceptions are caught and dealt with properly. Still, there + # may be a bug in the infrastructure, and we do not want those + # to cause messages to be lost. Any uncaught exceptions will + # cause the message to be stored in the shunt queue for human + # intervention. + self._log(e) + # Put a marker in the metadata for unshunting + msgdata['whichq'] = self._switchboard.queue_directory + # It is possible that shunting can throw an exception, e.g. a + # permissions problem or a MemoryError due to a really large + # message. Try to be graceful. + try: + new_filebase = self._shunt.enqueue(msg, msgdata) + log.error('SHUNTING: %s', new_filebase) + self._switchboard.finish(filebase) + except Exception, e: + # The message wasn't successfully shunted. Log the + # exception and try to preserve the original queue entry + # for possible analysis. + self._log(e) + log.error('SHUNTING FAILED, preserving original entry: %s', + filebase) + self._switchboard.finish(filebase, preserve=True) + # Other work we want to do each time through the loop + Utils.reap(self._kids, once=True) + self._doperiodic() + if self._shortcircuit(): + break + return len(files) + + def _onefile(self, msg, msgdata): + # Do some common sanity checking on the message metadata. It's got to + # be destined for a particular mailing list. This switchboard is used + # to shunt off badly formatted messages. We don't want to just trash + # them because they may be fixable with human intervention. Just get + # them out of our site though. + # + # Find out which mailing list this message is destined for. + listname = msgdata.get('listname') + mlist = config.db.list_manager.get(listname) + if not mlist: + log.error('Dequeuing message destined for missing list: %s', + listname) + self._shunt.enqueue(msg, msgdata) + return + # Now process this message, keeping track of any subprocesses that may + # have been spawned. We'll reap those later. + # + # We also want to set up the language context for this message. The + # context will be the preferred language for the user if a member of + # the list, or the list's preferred language. However, we must take + # special care to reset the defaults, otherwise subsequent messages + # may be translated incorrectly. BAW: I'm not sure I like this + # approach, but I can't think of anything better right now. + sender = msg.get_sender() + member = mlist.members.get_member(sender) + if member: + lang = member.preferred_language + else: + lang = mlist.preferred_language + with i18n.using_language(lang): + msgdata['lang'] = lang + keepqueued = self._dispose(mlist, msg, msgdata) + # Keep tabs on any child processes that got spawned. + kids = msgdata.get('_kids') + if kids: + self._kids.update(kids) + if keepqueued: + self._switchboard.enqueue(msg, msgdata) + + def _log(self, exc): + log.error('Uncaught runner exception: %s', exc) + s = StringIO() + traceback.print_exc(file=s) + log.error('%s', s.getvalue()) + + # + # Subclasses can override these methods. + # + def _cleanup(self): + """Clean up upon exit from the main processing loop. + + Called when the Runner's main loop is stopped, this should perform + any necessary resource deallocation. Its return value is irrelevant. + """ + Utils.reap(self._kids) + + def _dispose(self, mlist, msg, msgdata): + """Dispose of a single message destined for a mailing list. + + Called for each message that the Runner is responsible for, this is + the primary overridable method for processing each message. + Subclasses, must provide implementation for this method. + + mlist is the IMailingList instance this message is destined for. + + msg is the Message object representing the message. + + msgdata is a dictionary of message metadata. + """ + raise NotImplementedError + + def _doperiodic(self): + """Do some processing `every once in a while'. + + Called every once in a while both from the Runner's main loop, and + from the Runner's hash slice processing loop. You can do whatever + special periodic processing you want here, and the return value is + irrelevant. + """ + pass + + def _snooze(self, filecnt): + """Sleep for a little while. + + filecnt is the number of messages in the queue the last time through. + Sub-runners can decide to continue to do work, or sleep for a while + based on this value. By default, we only snooze if there was nothing + to do last time around. + """ + if filecnt or float(self.SLEEPTIME) <= 0: + return + time.sleep(float(self.SLEEPTIME)) + + def _shortcircuit(self): + """Return a true value if the individual file processing loop should + exit before it's finished processing each message in the current slice + of hash space. A false value tells _oneloop() to continue processing + until the current snapshot of hash space is exhausted. + + You could, for example, implement a throttling algorithm here. + """ + return self._stop |
