diff options
Diffstat (limited to 'mailman/queue/__init__.py')
| -rw-r--r-- | mailman/queue/__init__.py | 187 |
1 files changed, 82 insertions, 105 deletions
diff --git a/mailman/queue/__init__.py b/mailman/queue/__init__.py index 351fa0bd2..290f53978 100644 --- a/mailman/queue/__init__.py +++ b/mailman/queue/__init__.py @@ -69,6 +69,19 @@ class Switchboard: implements(ISwitchboard) def __init__(self, whichq, slice=None, numslices=1, recover=False): + """Create a switchboard object. + + :param whichq: The queue directory. + :type whichq: str + :param slice: The slice number for this switchboard, or None. If not + None, it must be [0..`numslices`). + :type slice: int or None + :param numslices: The total number of slices to split this queue + directory into. It must be a power of 2. + :type numslices: int + :param recover: True if backup files should be recovered. + :type recover: bool + """ self._whichq = whichq # Create the directory if it doesn't yet exist. Utils.makedirs(self._whichq, 0770) @@ -84,9 +97,11 @@ class Switchboard: @property def queue_directory(self): + """See `ISwitchboard`.""" return self._whichq def enqueue(self, _msg, _metadata=None, **_kws): + """See `ISwitchboard`.""" if _metadata is None: _metadata = {} # Calculate the SHA hexdigest of the message to get a unique base @@ -133,6 +148,7 @@ class Switchboard: return filebase def dequeue(self, filebase): + """See `ISwitchboard`.""" # Calculate the filename from the given filebase. filename = os.path.join(self._whichq, filebase + '.pck') backfile = os.path.join(self._whichq, filebase + '.bak') @@ -174,9 +190,11 @@ class Switchboard: @property def files(self): + """See `ISwitchboard`.""" return self.get_files() def get_files(self, extension='.pck'): + """See `ISwitchboard`.""" times = {} lower = self._lower upper = self._upper @@ -199,6 +217,7 @@ class Switchboard: return [times[key] for key in sorted(times)] def recover_backup_files(self): + """See `ISwitchboard`.""" # Move all .bak files in our slice to .pck. It's impossible for both # to exist at the same time, so the move is enough to ensure that our # normal dequeuing process will handle them. @@ -216,7 +235,15 @@ class Runner: SLEEPTIME = None def __init__(self, slice=None, numslices=1): - self._kids = {} + """Create a queue runner. + + :param slice: The slice number for this queue runner. This is passed + directly to the underlying `ISwitchboard` object. + :type slice: int or None + :param numslices: The number of slices for this queue. Must be a + power of 2. + :type numslices: int + """ # Create our own switchboard. Don't use the switchboard cache because # we want to provide slice and numslice arguments. self._switchboard = Switchboard(self.QDIR, slice, numslices, True) @@ -230,54 +257,50 @@ class Runner: return '<%s at %s>' % (self.__class__.__name__, id(self)) def stop(self): + """See `IRunner`.""" self._stop = True def run(self): + """See `IRunner`.""" # Start the main loop for this queue runner. try: - try: - while True: - # Once through the loop that processes all the files in - # the queue directory. - filecnt = self._oneloop() - # Do the periodic work for the subclass. BAW: this - # shouldn't be called here. There should be one more - # _doperiodic() call at the end of the _oneloop() loop. - self._doperiodic() - # If the stop flag is set, we're done. - if self._stop: - break - # Give the runner an opportunity to snooze for a while, - # but pass it the file count so it can decide whether to - # do more work now or not. - self._snooze(filecnt) - except KeyboardInterrupt: - pass + while True: + # Once through the loop that processes all the files in the + # queue directory. + filecnt = self._one_iteration() + # Do the periodic work for the subclass. + self._do_periodic() + # If the stop flag is set, we're done. + if self._stop: + break + # Give the runner an opportunity to snooze for a while, but + # pass it the file count so it can decide whether to do more + # work now or not. + self._snooze(filecnt) + except KeyboardInterrupt: + pass finally: - # We've broken out of our main loop, so we want to reap all the - # subprocesses we've created and do any other necessary cleanups. - self._cleanup() + self._clean_up() - def _oneloop(self): + def _one_iteration(self): + """See `IRunner`.""" me = self.__class__.__name__ dlog.debug('[%s] starting oneloop', me) - # First, list all the files in our queue directory. - # Switchboard.files() is guaranteed to hand us the files in FIFO - # order. Return an integer count of the number of files that were - # available for this qrunner to process. + # List all the files in our queue directory. The switchboard is + # guaranteed to hand us the files in FIFO order. files = self._switchboard.files for filebase in files: dlog.debug('[%s] processing filebase: %s', me, filebase) try: # Ask the switchboard for the message and metadata objects - # associated with this filebase. + # associated with this queue file. msg, msgdata = self._switchboard.dequeue(filebase) except Exception, e: - # This used to just catch email.Errors.MessageParseError, - # but other problems can occur in message parsing, e.g. - # ValueError, and exceptions can occur in unpickling too. - # We don't want the runner to die, so we just log and skip - # this entry, but preserve it for analysis. + # This used to just catch email.Errors.MessageParseError, but + # other problems can occur in message parsing, e.g. + # ValueError, and exceptions can occur in unpickling too. We + # don't want the runner to die, so we just log and skip this + # entry, but preserve it for analysis. self._log(e) elog.error('Skipping and preserving unparseable message: %s', filebase) @@ -286,7 +309,7 @@ class Runner: continue try: dlog.debug('[%s] processing onefile', me) - self._onefile(msg, msgdata) + self._process_one_file(msg, msgdata) dlog.debug('[%s] finishing filebase: %s', me, filebase) self._switchboard.finish(filebase) except Exception, e: @@ -297,7 +320,7 @@ class Runner: # cause the message to be stored in the shunt queue for human # intervention. self._log(e) - # Put a marker in the metadata for unshunting + # Put a marker in the metadata for unshunting. msgdata['whichq'] = self._switchboard.queue_directory # It is possible that shunting can throw an exception, e.g. a # permissions problem or a MemoryError due to a really large @@ -317,12 +340,10 @@ class Runner: self._switchboard.finish(filebase, preserve=True) config.db.abort() # Other work we want to do each time through the loop. - dlog.debug('[%s] reaping', me) - Utils.reap(self._kids, once=True) dlog.debug('[%s] doing periodic', me) - self._doperiodic() + self._do_periodic() dlog.debug('[%s] checking short circuit', me) - if self._shortcircuit(): + if self._short_curcuit(): dlog.debug('[%s] short circuiting', me) break dlog.debug('[%s] commiting', me) @@ -330,43 +351,36 @@ class Runner: dlog.debug('[%s] ending oneloop: %s', me, len(files)) return len(files) - def _onefile(self, msg, msgdata): + def _process_one_file(self, msg, msgdata): + """See `IRunner`.""" # Do some common sanity checking on the message metadata. It's got to # be destined for a particular mailing list. This switchboard is used # to shunt off badly formatted messages. We don't want to just trash # them because they may be fixable with human intervention. Just get - # them out of our site though. + # them out of our sight. # # Find out which mailing list this message is destined for. listname = msgdata.get('listname') mlist = config.db.list_manager.get(listname) - if not mlist: + if mlist is None: elog.error('Dequeuing message destined for missing list: %s', listname) self._shunt.enqueue(msg, msgdata) return - # Now process this message, keeping track of any subprocesses that may - # have been spawned. We'll reap those later. - # - # We also want to set up the language context for this message. The - # context will be the preferred language for the user if a member of - # the list, or the list's preferred language. However, we must take + # Now process this message. We also want to set up the language + # context for this message. The context will be the preferred + # language for the user if the sender is a member of the list, or it + # will be the list's preferred language. However, we must take # special care to reset the defaults, otherwise subsequent messages - # may be translated incorrectly. BAW: I'm not sure I like this - # approach, but I can't think of anything better right now. + # may be translated incorrectly. sender = msg.get_sender() member = mlist.members.get_member(sender) - if member: - lang = member.preferred_language - else: - lang = mlist.preferred_language - with i18n.using_language(lang): - msgdata['lang'] = lang + language = (member.preferred_language + if member is not None + else mlist.preferred_language) + with i18n.using_language(language): + msgdata['lang'] = language keepqueued = self._dispose(mlist, msg, msgdata) - # Keep tabs on any child processes that got spawned. - kids = msgdata.get('_kids') - if kids: - self._kids.update(kids) if keepqueued: self._switchboard.enqueue(msg, msgdata) @@ -376,60 +390,23 @@ class Runner: traceback.print_exc(file=s) elog.error('%s', s.getvalue()) - # - # Subclasses can override these methods. - # - def _cleanup(self): - """Clean up upon exit from the main processing loop. - - Called when the Runner's main loop is stopped, this should perform - any necessary resource deallocation. Its return value is irrelevant. - """ - Utils.reap(self._kids) + def _clean_up(self): + """See `IRunner`.""" def _dispose(self, mlist, msg, msgdata): - """Dispose of a single message destined for a mailing list. - - Called for each message that the Runner is responsible for, this is - the primary overridable method for processing each message. - Subclasses, must provide implementation for this method. - - mlist is the IMailingList instance this message is destined for. - - msg is the Message object representing the message. - - msgdata is a dictionary of message metadata. - """ + """See `IRunner`.""" raise NotImplementedError - def _doperiodic(self): - """Do some processing `every once in a while'. - - Called every once in a while both from the Runner's main loop, and - from the Runner's hash slice processing loop. You can do whatever - special periodic processing you want here, and the return value is - irrelevant. - """ + def _do_periodic(self): + """See `IRunner`.""" pass def _snooze(self, filecnt): - """Sleep for a little while. - - filecnt is the number of messages in the queue the last time through. - Sub-runners can decide to continue to do work, or sleep for a while - based on this value. By default, we only snooze if there was nothing - to do last time around. - """ + """See `IRunner`.""" if filecnt or float(self.SLEEPTIME) <= 0: return time.sleep(float(self.SLEEPTIME)) - def _shortcircuit(self): - """Return a true value if the individual file processing loop should - exit before it's finished processing each message in the current slice - of hash space. A false value tells _oneloop() to continue processing - until the current snapshot of hash space is exhausted. - - You could, for example, implement a throttling algorithm here. - """ + def _short_curcuit(self): + """See `IRunner`.""" return self._stop |
