diff options
| author | Barry Warsaw | 2008-09-29 21:48:19 -0400 |
|---|---|---|
| committer | Barry Warsaw | 2008-09-29 21:48:19 -0400 |
| commit | af67970b37a00941d532de574d0cd9c05b207d69 (patch) | |
| tree | d1af18d6361ea5f22cc768e9c306f22023b6db56 | |
| parent | 20a97a4163774212ad9f16c5a2e3abcbf3ecf918 (diff) | |
| download | mailman-af67970b37a00941d532de574d0cd9c05b207d69.tar.gz mailman-af67970b37a00941d532de574d0cd9c05b207d69.tar.zst mailman-af67970b37a00941d532de574d0cd9c05b207d69.zip | |
| -rw-r--r-- | mailman/Utils.py | 24 | ||||
| -rw-r--r-- | mailman/bin/qrunner.py | 4 | ||||
| -rw-r--r-- | mailman/interfaces/runner.py | 79 | ||||
| -rw-r--r-- | mailman/queue/__init__.py | 187 | ||||
| -rw-r--r-- | mailman/queue/bounce.py | 14 | ||||
| -rw-r--r-- | mailman/queue/docs/runner.txt | 2 | ||||
| -rw-r--r-- | mailman/queue/http.py | 2 | ||||
| -rw-r--r-- | mailman/queue/maildir.py | 4 | ||||
| -rw-r--r-- | mailman/queue/outgoing.py | 12 | ||||
| -rw-r--r-- | mailman/testing/helpers.py | 2 |
10 files changed, 181 insertions, 149 deletions
diff --git a/mailman/Utils.py b/mailman/Utils.py index c0d2620b3..b9e34d5cd 100644 --- a/mailman/Utils.py +++ b/mailman/Utils.py @@ -563,30 +563,6 @@ def GetRequestURI(fallback=None, escape=True): -# Wait on a dictionary of child pids -def reap(kids, func=None, once=False): - while kids: - if func: - func() - try: - pid, status = os.waitpid(-1, os.WNOHANG) - except OSError, e: - # If the child procs had a bug we might have no children - if e.errno <> errno.ECHILD: - raise - kids.clear() - break - if pid <> 0: - try: - del kids[pid] - except KeyError: - # Huh? How can this happen? - pass - if once: - break - - - def makedirs(path, mode=02775): try: omask = os.umask(0) diff --git a/mailman/bin/qrunner.py b/mailman/bin/qrunner.py index 33c17f91d..422809fa3 100644 --- a/mailman/bin/qrunner.py +++ b/mailman/bin/qrunner.py @@ -149,9 +149,9 @@ def make_qrunner(name, slice, range, once=False): raise qrclass = getattr(sys.modules[modulename], classname) if once: - # Subclass to hack in the setting of the stop flag in _doperiodic() + # Subclass to hack in the setting of the stop flag in _do_periodic() class Once(qrclass): - def _doperiodic(self): + def _do_periodic(self): self.stop() qrunner = Once(slice, range) else: diff --git a/mailman/interfaces/runner.py b/mailman/interfaces/runner.py index d279c5e7a..28b15968c 100644 --- a/mailman/interfaces/runner.py +++ b/mailman/interfaces/runner.py @@ -29,3 +29,82 @@ class IRunner(Interface): def stop(): """Stop the queue runner on the next iteration through the loop.""" + + QDIR = Attribute('The queue directory. Overridden in subclasses.') + + SLEEPTIME = Attribute("""\ + The number of seconds this queue runner will sleep between iterations + through the main loop. If given, overrides + `config.QRUNNER_SLEEP_TIME` + """) + + def _one_iteration(): + """The work done in one iteration of the main loop. + + Can be overridden by subclasses. + + :return: The number of files still left to process. + :rtype: int + """ + + def _process_one_file(msg, msgdata): + """Process one queue file. + + :param msg: The message object. + :type msg: `email.message.Message` + :param msgdata: The message metadata. + :type msgdata: dict + """ + + def _clean_up(): + """Clean up upon exit from the main processing loop. + + Called when the queue runner's main loop is stopped, this should + perform any necessary resource deallocation. + """ + + def _dispose(mlist, msg, msgdata): + """Dispose of a single message destined for a mailing list. + + Called for each message that the queue runner is responsible for, this + is the primary overridable method for processing each message. + Subclasses, must provide implementation for this method. + + :param mlist: The mailing list this message is destined for. + :type mlist: `IMailingList` + :param msg: The message being processed. + :type msg: `email.message.Message` + :param msgdata: The message metadata. + :type msgdata: dict + :return: True if the message should continue to be queue, False if the + message should be deleted automatically. + :rtype: bool + """ + + def _do_periodic(): + """Do some arbitrary periodic processing. + + Called every once in a while both from the queue runner's main loop, + and from the runner's hash slice processing loop. You can do whatever + special periodic processing you want here. + """ + + def _snooze(filecnt): + """Sleep for a little while. + + :param filecnt: The number of messages in the queue the last time + through. Queue runners can decide to continue to do work, or + sleep for a while based on this value. By default, the base queue + runner only snoozes when there was nothing to do last time around. + :type filecnt: int + """ + + def _short_circuit(): + """Should processing be short-circuited? + + :return: True if the file processing loop should exit before it's + finished processing each message in the current slice of hash + space. False tells _one_iteration() to continue processing until + the current snapshot of hash space is exhausted. + :rtype: bool + """ diff --git a/mailman/queue/__init__.py b/mailman/queue/__init__.py index 351fa0bd2..290f53978 100644 --- a/mailman/queue/__init__.py +++ b/mailman/queue/__init__.py @@ -69,6 +69,19 @@ class Switchboard: implements(ISwitchboard) def __init__(self, whichq, slice=None, numslices=1, recover=False): + """Create a switchboard object. + + :param whichq: The queue directory. + :type whichq: str + :param slice: The slice number for this switchboard, or None. If not + None, it must be [0..`numslices`). + :type slice: int or None + :param numslices: The total number of slices to split this queue + directory into. It must be a power of 2. + :type numslices: int + :param recover: True if backup files should be recovered. + :type recover: bool + """ self._whichq = whichq # Create the directory if it doesn't yet exist. Utils.makedirs(self._whichq, 0770) @@ -84,9 +97,11 @@ class Switchboard: @property def queue_directory(self): + """See `ISwitchboard`.""" return self._whichq def enqueue(self, _msg, _metadata=None, **_kws): + """See `ISwitchboard`.""" if _metadata is None: _metadata = {} # Calculate the SHA hexdigest of the message to get a unique base @@ -133,6 +148,7 @@ class Switchboard: return filebase def dequeue(self, filebase): + """See `ISwitchboard`.""" # Calculate the filename from the given filebase. filename = os.path.join(self._whichq, filebase + '.pck') backfile = os.path.join(self._whichq, filebase + '.bak') @@ -174,9 +190,11 @@ class Switchboard: @property def files(self): + """See `ISwitchboard`.""" return self.get_files() def get_files(self, extension='.pck'): + """See `ISwitchboard`.""" times = {} lower = self._lower upper = self._upper @@ -199,6 +217,7 @@ class Switchboard: return [times[key] for key in sorted(times)] def recover_backup_files(self): + """See `ISwitchboard`.""" # Move all .bak files in our slice to .pck. It's impossible for both # to exist at the same time, so the move is enough to ensure that our # normal dequeuing process will handle them. @@ -216,7 +235,15 @@ class Runner: SLEEPTIME = None def __init__(self, slice=None, numslices=1): - self._kids = {} + """Create a queue runner. + + :param slice: The slice number for this queue runner. This is passed + directly to the underlying `ISwitchboard` object. + :type slice: int or None + :param numslices: The number of slices for this queue. Must be a + power of 2. + :type numslices: int + """ # Create our own switchboard. Don't use the switchboard cache because # we want to provide slice and numslice arguments. self._switchboard = Switchboard(self.QDIR, slice, numslices, True) @@ -230,54 +257,50 @@ class Runner: return '<%s at %s>' % (self.__class__.__name__, id(self)) def stop(self): + """See `IRunner`.""" self._stop = True def run(self): + """See `IRunner`.""" # Start the main loop for this queue runner. try: - try: - while True: - # Once through the loop that processes all the files in - # the queue directory. - filecnt = self._oneloop() - # Do the periodic work for the subclass. BAW: this - # shouldn't be called here. There should be one more - # _doperiodic() call at the end of the _oneloop() loop. - self._doperiodic() - # If the stop flag is set, we're done. - if self._stop: - break - # Give the runner an opportunity to snooze for a while, - # but pass it the file count so it can decide whether to - # do more work now or not. - self._snooze(filecnt) - except KeyboardInterrupt: - pass + while True: + # Once through the loop that processes all the files in the + # queue directory. + filecnt = self._one_iteration() + # Do the periodic work for the subclass. + self._do_periodic() + # If the stop flag is set, we're done. + if self._stop: + break + # Give the runner an opportunity to snooze for a while, but + # pass it the file count so it can decide whether to do more + # work now or not. + self._snooze(filecnt) + except KeyboardInterrupt: + pass finally: - # We've broken out of our main loop, so we want to reap all the - # subprocesses we've created and do any other necessary cleanups. - self._cleanup() + self._clean_up() - def _oneloop(self): + def _one_iteration(self): + """See `IRunner`.""" me = self.__class__.__name__ dlog.debug('[%s] starting oneloop', me) - # First, list all the files in our queue directory. - # Switchboard.files() is guaranteed to hand us the files in FIFO - # order. Return an integer count of the number of files that were - # available for this qrunner to process. + # List all the files in our queue directory. The switchboard is + # guaranteed to hand us the files in FIFO order. files = self._switchboard.files for filebase in files: dlog.debug('[%s] processing filebase: %s', me, filebase) try: # Ask the switchboard for the message and metadata objects - # associated with this filebase. + # associated with this queue file. msg, msgdata = self._switchboard.dequeue(filebase) except Exception, e: - # This used to just catch email.Errors.MessageParseError, - # but other problems can occur in message parsing, e.g. - # ValueError, and exceptions can occur in unpickling too. - # We don't want the runner to die, so we just log and skip - # this entry, but preserve it for analysis. + # This used to just catch email.Errors.MessageParseError, but + # other problems can occur in message parsing, e.g. + # ValueError, and exceptions can occur in unpickling too. We + # don't want the runner to die, so we just log and skip this + # entry, but preserve it for analysis. self._log(e) elog.error('Skipping and preserving unparseable message: %s', filebase) @@ -286,7 +309,7 @@ class Runner: continue try: dlog.debug('[%s] processing onefile', me) - self._onefile(msg, msgdata) + self._process_one_file(msg, msgdata) dlog.debug('[%s] finishing filebase: %s', me, filebase) self._switchboard.finish(filebase) except Exception, e: @@ -297,7 +320,7 @@ class Runner: # cause the message to be stored in the shunt queue for human # intervention. self._log(e) - # Put a marker in the metadata for unshunting + # Put a marker in the metadata for unshunting. msgdata['whichq'] = self._switchboard.queue_directory # It is possible that shunting can throw an exception, e.g. a # permissions problem or a MemoryError due to a really large @@ -317,12 +340,10 @@ class Runner: self._switchboard.finish(filebase, preserve=True) config.db.abort() # Other work we want to do each time through the loop. - dlog.debug('[%s] reaping', me) - Utils.reap(self._kids, once=True) dlog.debug('[%s] doing periodic', me) - self._doperiodic() + self._do_periodic() dlog.debug('[%s] checking short circuit', me) - if self._shortcircuit(): + if self._short_curcuit(): dlog.debug('[%s] short circuiting', me) break dlog.debug('[%s] commiting', me) @@ -330,43 +351,36 @@ class Runner: dlog.debug('[%s] ending oneloop: %s', me, len(files)) return len(files) - def _onefile(self, msg, msgdata): + def _process_one_file(self, msg, msgdata): + """See `IRunner`.""" # Do some common sanity checking on the message metadata. It's got to # be destined for a particular mailing list. This switchboard is used # to shunt off badly formatted messages. We don't want to just trash # them because they may be fixable with human intervention. Just get - # them out of our site though. + # them out of our sight. # # Find out which mailing list this message is destined for. listname = msgdata.get('listname') mlist = config.db.list_manager.get(listname) - if not mlist: + if mlist is None: elog.error('Dequeuing message destined for missing list: %s', listname) self._shunt.enqueue(msg, msgdata) return - # Now process this message, keeping track of any subprocesses that may - # have been spawned. We'll reap those later. - # - # We also want to set up the language context for this message. The - # context will be the preferred language for the user if a member of - # the list, or the list's preferred language. However, we must take + # Now process this message. We also want to set up the language + # context for this message. The context will be the preferred + # language for the user if the sender is a member of the list, or it + # will be the list's preferred language. However, we must take # special care to reset the defaults, otherwise subsequent messages - # may be translated incorrectly. BAW: I'm not sure I like this - # approach, but I can't think of anything better right now. + # may be translated incorrectly. sender = msg.get_sender() member = mlist.members.get_member(sender) - if member: - lang = member.preferred_language - else: - lang = mlist.preferred_language - with i18n.using_language(lang): - msgdata['lang'] = lang + language = (member.preferred_language + if member is not None + else mlist.preferred_language) + with i18n.using_language(language): + msgdata['lang'] = language keepqueued = self._dispose(mlist, msg, msgdata) - # Keep tabs on any child processes that got spawned. - kids = msgdata.get('_kids') - if kids: - self._kids.update(kids) if keepqueued: self._switchboard.enqueue(msg, msgdata) @@ -376,60 +390,23 @@ class Runner: traceback.print_exc(file=s) elog.error('%s', s.getvalue()) - # - # Subclasses can override these methods. - # - def _cleanup(self): - """Clean up upon exit from the main processing loop. - - Called when the Runner's main loop is stopped, this should perform - any necessary resource deallocation. Its return value is irrelevant. - """ - Utils.reap(self._kids) + def _clean_up(self): + """See `IRunner`.""" def _dispose(self, mlist, msg, msgdata): - """Dispose of a single message destined for a mailing list. - - Called for each message that the Runner is responsible for, this is - the primary overridable method for processing each message. - Subclasses, must provide implementation for this method. - - mlist is the IMailingList instance this message is destined for. - - msg is the Message object representing the message. - - msgdata is a dictionary of message metadata. - """ + """See `IRunner`.""" raise NotImplementedError - def _doperiodic(self): - """Do some processing `every once in a while'. - - Called every once in a while both from the Runner's main loop, and - from the Runner's hash slice processing loop. You can do whatever - special periodic processing you want here, and the return value is - irrelevant. - """ + def _do_periodic(self): + """See `IRunner`.""" pass def _snooze(self, filecnt): - """Sleep for a little while. - - filecnt is the number of messages in the queue the last time through. - Sub-runners can decide to continue to do work, or sleep for a while - based on this value. By default, we only snooze if there was nothing - to do last time around. - """ + """See `IRunner`.""" if filecnt or float(self.SLEEPTIME) <= 0: return time.sleep(float(self.SLEEPTIME)) - def _shortcircuit(self): - """Return a true value if the individual file processing loop should - exit before it's finished processing each message in the current slice - of hash space. A false value tells _oneloop() to continue processing - until the current snapshot of hash space is exhausted. - - You could, for example, implement a throttling algorithm here. - """ + def _short_curcuit(self): + """See `IRunner`.""" return self._stop diff --git a/mailman/queue/bounce.py b/mailman/queue/bounce.py index cfd86be60..0c5788174 100644 --- a/mailman/queue/bounce.py +++ b/mailman/queue/bounce.py @@ -51,7 +51,7 @@ class BounceMixin: # # today is itself a 3-tuple of (year, month, day) # - # Every once in a while (see _doperiodic()), the bounce runner cracks + # Every once in a while (see _do_periodic()), the bounce runner cracks # open the file, reads all the records and registers all the bounces. # Then it truncates the file and continues on. We don't need to lock # the bounce event file because bounce qrunners are single threaded @@ -123,11 +123,11 @@ class BounceMixin: os.unlink(self._bounce_events_file) self._bouncecnt = 0 - def _cleanup(self): + def _clean_up(self): if self._bouncecnt > 0: self._register_bounces() - def _doperiodic(self): + def _do_periodic(self): now = datetime.datetime.now() if self._nextaction > now or self._bouncecnt == 0: return @@ -218,11 +218,11 @@ class BounceRunner(Runner, BounceMixin): addrs = filter(None, addrs) self._queue_bounces(mlist.fqdn_listname, addrs, msg) - _doperiodic = BounceMixin._doperiodic + _do_periodic = BounceMixin._do_periodic - def _cleanup(self): - BounceMixin._cleanup(self) - Runner._cleanup(self) + def _clean_up(self): + BounceMixin._clean_up(self) + Runner._clean_up(self) diff --git a/mailman/queue/docs/runner.txt b/mailman/queue/docs/runner.txt index e95e20ecd..a9e17370b 100644 --- a/mailman/queue/docs/runner.txt +++ b/mailman/queue/docs/runner.txt @@ -34,7 +34,7 @@ This is about as simple as a qrunner can be. ... self.msgdata = msgdata ... return False ... - ... def _doperiodic(self): + ... def _do_periodic(self): ... self.stop() ... ... def _snooze(self, filecnt): diff --git a/mailman/queue/http.py b/mailman/queue/http.py index bbbef51df..cb6940b29 100644 --- a/mailman/queue/http.py +++ b/mailman/queue/http.py @@ -37,7 +37,7 @@ class HTTPRunner(Runner): def __init__(self, slice=None, numslices=1): pass - def _cleanup(self): + def _clean_up(self): pass diff --git a/mailman/queue/maildir.py b/mailman/queue/maildir.py index 4a6c0f250..71bac67dc 100644 --- a/mailman/queue/maildir.py +++ b/mailman/queue/maildir.py @@ -98,7 +98,7 @@ class MaildirRunner(Runner): self._cur = os.path.join(config.MAILDIR_DIR, 'cur') self._parser = Parser(Message) - def _oneloop(self): + def _one_iteration(self): # Refresh this each time through the list. listnames = list(config.list_manager.names) # Cruise through all the files currently in the new/ directory @@ -185,5 +185,5 @@ class MaildirRunner(Runner): os.rename(dstname, xdstname) log.error('%s', e) - def _cleanup(self): + def _clean_up(self): pass diff --git a/mailman/queue/outgoing.py b/mailman/queue/outgoing.py index 624368930..3ab67eaad 100644 --- a/mailman/queue/outgoing.py +++ b/mailman/queue/outgoing.py @@ -32,8 +32,8 @@ from mailman.core import errors from mailman.queue import Runner, Switchboard from mailman.queue.bounce import BounceMixin -# This controls how often _doperiodic() will try to deal with deferred -# permanent failures. It is a count of calls to _doperiodic() +# This controls how often _do_periodic() will try to deal with deferred +# permanent failures. It is a count of calls to _do_periodic() DEAL_WITH_PERMFAILURES_EVERY = 10 log = logging.getLogger('mailman.error') @@ -123,8 +123,8 @@ class OutgoingRunner(Runner, BounceMixin): # We've successfully completed handling of this message return False - _doperiodic = BounceMixin._doperiodic + _do_periodic = BounceMixin._do_periodic - def _cleanup(self): - BounceMixin._cleanup(self) - Runner._cleanup(self) + def _clean_up(self): + BounceMixin._clean_up(self) + Runner._clean_up(self) diff --git a/mailman/testing/helpers.py b/mailman/testing/helpers.py index dd241ddba..5bd019ab7 100644 --- a/mailman/testing/helpers.py +++ b/mailman/testing/helpers.py @@ -63,7 +63,7 @@ def make_testable_runner(runner_class): class EmptyingRunner(runner_class): """Stop processing when the queue is empty.""" - def _doperiodic(self): + def _do_periodic(self): """Stop when the queue is empty.""" self._stop = (len(self._switchboard.files) == 0) |
