diff options
Diffstat (limited to 'Mailman/Queue/Runner.py')
| -rw-r--r-- | Mailman/Queue/Runner.py | 244 |
1 files changed, 0 insertions, 244 deletions
diff --git a/Mailman/Queue/Runner.py b/Mailman/Queue/Runner.py deleted file mode 100644 index 958e11e6c..000000000 --- a/Mailman/Queue/Runner.py +++ /dev/null @@ -1,244 +0,0 @@ -# Copyright (C) 1998-2007 by the Free Software Foundation, Inc. -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, -# USA. - -"""Generic queue runner class.""" - -from __future__ import with_statement - -import time -import weakref -import logging -import traceback -import email.Errors - -from cStringIO import StringIO - -from Mailman import Errors -from Mailman import Utils -from Mailman import i18n -from Mailman.Queue.Switchboard import Switchboard -from Mailman.configuration import config - -log = logging.getLogger('mailman.error') - - - -class Runner: - QDIR = None - SLEEPTIME = config.QRUNNER_SLEEP_TIME - - def __init__(self, slice=None, numslices=1): - self._kids = {} - # Create our own switchboard. Don't use the switchboard cache because - # we want to provide slice and numslice arguments. - self._switchboard = Switchboard(self.QDIR, slice, numslices, True) - # Create the shunt switchboard - self._shunt = Switchboard(config.SHUNTQUEUE_DIR) - self._stop = False - - def __repr__(self): - return '<%s at %s>' % (self.__class__.__name__, id(self)) - - def stop(self): - self._stop = True - - def run(self): - # Start the main loop for this queue runner. - try: - try: - while True: - # Once through the loop that processes all the files in - # the queue directory. - filecnt = self._oneloop() - # Do the periodic work for the subclass. BAW: this - # shouldn't be called here. There should be one more - # _doperiodic() call at the end of the _oneloop() loop. - self._doperiodic() - # If the stop flag is set, we're done. - if self._stop: - break - # Give the runner an opportunity to snooze for a while, - # but pass it the file count so it can decide whether to - # do more work now or not. - self._snooze(filecnt) - except KeyboardInterrupt: - pass - finally: - # We've broken out of our main loop, so we want to reap all the - # subprocesses we've created and do any other necessary cleanups. - self._cleanup() - - def _oneloop(self): - # First, list all the files in our queue directory. - # Switchboard.files() is guaranteed to hand us the files in FIFO - # order. Return an integer count of the number of files that were - # available for this qrunner to process. - files = self._switchboard.files - for filebase in files: - try: - # Ask the switchboard for the message and metadata objects - # associated with this filebase. - msg, msgdata = self._switchboard.dequeue(filebase) - except Exception, e: - # This used to just catch email.Errors.MessageParseError, - # but other problems can occur in message parsing, e.g. - # ValueError, and exceptions can occur in unpickling too. - # We don't want the runner to die, so we just log and skip - # this entry, but preserve it for analysis. - self._log(e) - log.error('Skipping and preserving unparseable message: %s', - filebase) - self._switchboard.finish(filebase, preserve=True) - continue - try: - self._onefile(msg, msgdata) - self._switchboard.finish(filebase) - except Exception, e: - # All runners that implement _dispose() must guarantee that - # exceptions are caught and dealt with properly. Still, there - # may be a bug in the infrastructure, and we do not want those - # to cause messages to be lost. Any uncaught exceptions will - # cause the message to be stored in the shunt queue for human - # intervention. - self._log(e) - # Put a marker in the metadata for unshunting - msgdata['whichq'] = self._switchboard.queue_directory - # It is possible that shunting can throw an exception, e.g. a - # permissions problem or a MemoryError due to a really large - # message. Try to be graceful. - try: - new_filebase = self._shunt.enqueue(msg, msgdata) - log.error('SHUNTING: %s', new_filebase) - self._switchboard.finish(filebase) - except Exception, e: - # The message wasn't successfully shunted. Log the - # exception and try to preserve the original queue entry - # for possible analysis. - self._log(e) - log.error('SHUNTING FAILED, preserving original entry: %s', - filebase) - self._switchboard.finish(filebase, preserve=True) - # Other work we want to do each time through the loop - Utils.reap(self._kids, once=True) - self._doperiodic() - if self._shortcircuit(): - break - return len(files) - - def _onefile(self, msg, msgdata): - # Do some common sanity checking on the message metadata. It's got to - # be destined for a particular mailing list. This switchboard is used - # to shunt off badly formatted messages. We don't want to just trash - # them because they may be fixable with human intervention. Just get - # them out of our site though. - # - # Find out which mailing list this message is destined for. - listname = msgdata.get('listname') - mlist = config.db.list_manager.get(listname) - if not mlist: - log.error('Dequeuing message destined for missing list: %s', - listname) - self._shunt.enqueue(msg, msgdata) - return - # Now process this message, keeping track of any subprocesses that may - # have been spawned. We'll reap those later. - # - # We also want to set up the language context for this message. The - # context will be the preferred language for the user if a member of - # the list, or the list's preferred language. However, we must take - # special care to reset the defaults, otherwise subsequent messages - # may be translated incorrectly. BAW: I'm not sure I like this - # approach, but I can't think of anything better right now. - sender = msg.get_sender() - member = mlist.members.get_member(sender) - if member: - lang = member.preferred_language - else: - lang = mlist.preferred_language - with i18n.using_language(lang): - msgdata['lang'] = lang - keepqueued = self._dispose(mlist, msg, msgdata) - # Keep tabs on any child processes that got spawned. - kids = msgdata.get('_kids') - if kids: - self._kids.update(kids) - if keepqueued: - self._switchboard.enqueue(msg, msgdata) - - def _log(self, exc): - log.error('Uncaught runner exception: %s', exc) - s = StringIO() - traceback.print_exc(file=s) - log.error('%s', s.getvalue()) - - # - # Subclasses can override these methods. - # - def _cleanup(self): - """Clean up upon exit from the main processing loop. - - Called when the Runner's main loop is stopped, this should perform - any necessary resource deallocation. Its return value is irrelevant. - """ - Utils.reap(self._kids) - - def _dispose(self, mlist, msg, msgdata): - """Dispose of a single message destined for a mailing list. - - Called for each message that the Runner is responsible for, this is - the primary overridable method for processing each message. - Subclasses, must provide implementation for this method. - - mlist is the IMailingList instance this message is destined for. - - msg is the Message object representing the message. - - msgdata is a dictionary of message metadata. - """ - raise NotImplementedError - - def _doperiodic(self): - """Do some processing `every once in a while'. - - Called every once in a while both from the Runner's main loop, and - from the Runner's hash slice processing loop. You can do whatever - special periodic processing you want here, and the return value is - irrelevant. - """ - pass - - def _snooze(self, filecnt): - """Sleep for a little while. - - filecnt is the number of messages in the queue the last time through. - Sub-runners can decide to continue to do work, or sleep for a while - based on this value. By default, we only snooze if there was nothing - to do last time around. - """ - if filecnt or float(self.SLEEPTIME) <= 0: - return - time.sleep(float(self.SLEEPTIME)) - - def _shortcircuit(self): - """Return a true value if the individual file processing loop should - exit before it's finished processing each message in the current slice - of hash space. A false value tells _oneloop() to continue processing - until the current snapshot of hash space is exhausted. - - You could, for example, implement a throttling algorithm here. - """ - return self._stop |
