summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBarry Warsaw2008-09-29 21:48:19 -0400
committerBarry Warsaw2008-09-29 21:48:19 -0400
commitaf67970b37a00941d532de574d0cd9c05b207d69 (patch)
treed1af18d6361ea5f22cc768e9c306f22023b6db56
parent20a97a4163774212ad9f16c5a2e3abcbf3ecf918 (diff)
downloadmailman-af67970b37a00941d532de574d0cd9c05b207d69.tar.gz
mailman-af67970b37a00941d532de574d0cd9c05b207d69.tar.zst
mailman-af67970b37a00941d532de574d0cd9c05b207d69.zip
-rw-r--r--mailman/Utils.py24
-rw-r--r--mailman/bin/qrunner.py4
-rw-r--r--mailman/interfaces/runner.py79
-rw-r--r--mailman/queue/__init__.py187
-rw-r--r--mailman/queue/bounce.py14
-rw-r--r--mailman/queue/docs/runner.txt2
-rw-r--r--mailman/queue/http.py2
-rw-r--r--mailman/queue/maildir.py4
-rw-r--r--mailman/queue/outgoing.py12
-rw-r--r--mailman/testing/helpers.py2
10 files changed, 181 insertions, 149 deletions
diff --git a/mailman/Utils.py b/mailman/Utils.py
index c0d2620b3..b9e34d5cd 100644
--- a/mailman/Utils.py
+++ b/mailman/Utils.py
@@ -563,30 +563,6 @@ def GetRequestURI(fallback=None, escape=True):
-# Wait on a dictionary of child pids
-def reap(kids, func=None, once=False):
- while kids:
- if func:
- func()
- try:
- pid, status = os.waitpid(-1, os.WNOHANG)
- except OSError, e:
- # If the child procs had a bug we might have no children
- if e.errno <> errno.ECHILD:
- raise
- kids.clear()
- break
- if pid <> 0:
- try:
- del kids[pid]
- except KeyError:
- # Huh? How can this happen?
- pass
- if once:
- break
-
-
-
def makedirs(path, mode=02775):
try:
omask = os.umask(0)
diff --git a/mailman/bin/qrunner.py b/mailman/bin/qrunner.py
index 33c17f91d..422809fa3 100644
--- a/mailman/bin/qrunner.py
+++ b/mailman/bin/qrunner.py
@@ -149,9 +149,9 @@ def make_qrunner(name, slice, range, once=False):
raise
qrclass = getattr(sys.modules[modulename], classname)
if once:
- # Subclass to hack in the setting of the stop flag in _doperiodic()
+ # Subclass to hack in the setting of the stop flag in _do_periodic()
class Once(qrclass):
- def _doperiodic(self):
+ def _do_periodic(self):
self.stop()
qrunner = Once(slice, range)
else:
diff --git a/mailman/interfaces/runner.py b/mailman/interfaces/runner.py
index d279c5e7a..28b15968c 100644
--- a/mailman/interfaces/runner.py
+++ b/mailman/interfaces/runner.py
@@ -29,3 +29,82 @@ class IRunner(Interface):
def stop():
"""Stop the queue runner on the next iteration through the loop."""
+
+ QDIR = Attribute('The queue directory. Overridden in subclasses.')
+
+ SLEEPTIME = Attribute("""\
+ The number of seconds this queue runner will sleep between iterations
+ through the main loop. If given, overrides
+ `config.QRUNNER_SLEEP_TIME`
+ """)
+
+ def _one_iteration():
+ """The work done in one iteration of the main loop.
+
+ Can be overridden by subclasses.
+
+ :return: The number of files still left to process.
+ :rtype: int
+ """
+
+ def _process_one_file(msg, msgdata):
+ """Process one queue file.
+
+ :param msg: The message object.
+ :type msg: `email.message.Message`
+ :param msgdata: The message metadata.
+ :type msgdata: dict
+ """
+
+ def _clean_up():
+ """Clean up upon exit from the main processing loop.
+
+ Called when the queue runner's main loop is stopped, this should
+ perform any necessary resource deallocation.
+ """
+
+ def _dispose(mlist, msg, msgdata):
+ """Dispose of a single message destined for a mailing list.
+
+ Called for each message that the queue runner is responsible for, this
+ is the primary overridable method for processing each message.
+ Subclasses, must provide implementation for this method.
+
+ :param mlist: The mailing list this message is destined for.
+ :type mlist: `IMailingList`
+ :param msg: The message being processed.
+ :type msg: `email.message.Message`
+ :param msgdata: The message metadata.
+ :type msgdata: dict
+ :return: True if the message should continue to be queue, False if the
+ message should be deleted automatically.
+ :rtype: bool
+ """
+
+ def _do_periodic():
+ """Do some arbitrary periodic processing.
+
+ Called every once in a while both from the queue runner's main loop,
+ and from the runner's hash slice processing loop. You can do whatever
+ special periodic processing you want here.
+ """
+
+ def _snooze(filecnt):
+ """Sleep for a little while.
+
+ :param filecnt: The number of messages in the queue the last time
+ through. Queue runners can decide to continue to do work, or
+ sleep for a while based on this value. By default, the base queue
+ runner only snoozes when there was nothing to do last time around.
+ :type filecnt: int
+ """
+
+ def _short_circuit():
+ """Should processing be short-circuited?
+
+ :return: True if the file processing loop should exit before it's
+ finished processing each message in the current slice of hash
+ space. False tells _one_iteration() to continue processing until
+ the current snapshot of hash space is exhausted.
+ :rtype: bool
+ """
diff --git a/mailman/queue/__init__.py b/mailman/queue/__init__.py
index 351fa0bd2..290f53978 100644
--- a/mailman/queue/__init__.py
+++ b/mailman/queue/__init__.py
@@ -69,6 +69,19 @@ class Switchboard:
implements(ISwitchboard)
def __init__(self, whichq, slice=None, numslices=1, recover=False):
+ """Create a switchboard object.
+
+ :param whichq: The queue directory.
+ :type whichq: str
+ :param slice: The slice number for this switchboard, or None. If not
+ None, it must be [0..`numslices`).
+ :type slice: int or None
+ :param numslices: The total number of slices to split this queue
+ directory into. It must be a power of 2.
+ :type numslices: int
+ :param recover: True if backup files should be recovered.
+ :type recover: bool
+ """
self._whichq = whichq
# Create the directory if it doesn't yet exist.
Utils.makedirs(self._whichq, 0770)
@@ -84,9 +97,11 @@ class Switchboard:
@property
def queue_directory(self):
+ """See `ISwitchboard`."""
return self._whichq
def enqueue(self, _msg, _metadata=None, **_kws):
+ """See `ISwitchboard`."""
if _metadata is None:
_metadata = {}
# Calculate the SHA hexdigest of the message to get a unique base
@@ -133,6 +148,7 @@ class Switchboard:
return filebase
def dequeue(self, filebase):
+ """See `ISwitchboard`."""
# Calculate the filename from the given filebase.
filename = os.path.join(self._whichq, filebase + '.pck')
backfile = os.path.join(self._whichq, filebase + '.bak')
@@ -174,9 +190,11 @@ class Switchboard:
@property
def files(self):
+ """See `ISwitchboard`."""
return self.get_files()
def get_files(self, extension='.pck'):
+ """See `ISwitchboard`."""
times = {}
lower = self._lower
upper = self._upper
@@ -199,6 +217,7 @@ class Switchboard:
return [times[key] for key in sorted(times)]
def recover_backup_files(self):
+ """See `ISwitchboard`."""
# Move all .bak files in our slice to .pck. It's impossible for both
# to exist at the same time, so the move is enough to ensure that our
# normal dequeuing process will handle them.
@@ -216,7 +235,15 @@ class Runner:
SLEEPTIME = None
def __init__(self, slice=None, numslices=1):
- self._kids = {}
+ """Create a queue runner.
+
+ :param slice: The slice number for this queue runner. This is passed
+ directly to the underlying `ISwitchboard` object.
+ :type slice: int or None
+ :param numslices: The number of slices for this queue. Must be a
+ power of 2.
+ :type numslices: int
+ """
# Create our own switchboard. Don't use the switchboard cache because
# we want to provide slice and numslice arguments.
self._switchboard = Switchboard(self.QDIR, slice, numslices, True)
@@ -230,54 +257,50 @@ class Runner:
return '<%s at %s>' % (self.__class__.__name__, id(self))
def stop(self):
+ """See `IRunner`."""
self._stop = True
def run(self):
+ """See `IRunner`."""
# Start the main loop for this queue runner.
try:
- try:
- while True:
- # Once through the loop that processes all the files in
- # the queue directory.
- filecnt = self._oneloop()
- # Do the periodic work for the subclass. BAW: this
- # shouldn't be called here. There should be one more
- # _doperiodic() call at the end of the _oneloop() loop.
- self._doperiodic()
- # If the stop flag is set, we're done.
- if self._stop:
- break
- # Give the runner an opportunity to snooze for a while,
- # but pass it the file count so it can decide whether to
- # do more work now or not.
- self._snooze(filecnt)
- except KeyboardInterrupt:
- pass
+ while True:
+ # Once through the loop that processes all the files in the
+ # queue directory.
+ filecnt = self._one_iteration()
+ # Do the periodic work for the subclass.
+ self._do_periodic()
+ # If the stop flag is set, we're done.
+ if self._stop:
+ break
+ # Give the runner an opportunity to snooze for a while, but
+ # pass it the file count so it can decide whether to do more
+ # work now or not.
+ self._snooze(filecnt)
+ except KeyboardInterrupt:
+ pass
finally:
- # We've broken out of our main loop, so we want to reap all the
- # subprocesses we've created and do any other necessary cleanups.
- self._cleanup()
+ self._clean_up()
- def _oneloop(self):
+ def _one_iteration(self):
+ """See `IRunner`."""
me = self.__class__.__name__
dlog.debug('[%s] starting oneloop', me)
- # First, list all the files in our queue directory.
- # Switchboard.files() is guaranteed to hand us the files in FIFO
- # order. Return an integer count of the number of files that were
- # available for this qrunner to process.
+ # List all the files in our queue directory. The switchboard is
+ # guaranteed to hand us the files in FIFO order.
files = self._switchboard.files
for filebase in files:
dlog.debug('[%s] processing filebase: %s', me, filebase)
try:
# Ask the switchboard for the message and metadata objects
- # associated with this filebase.
+ # associated with this queue file.
msg, msgdata = self._switchboard.dequeue(filebase)
except Exception, e:
- # This used to just catch email.Errors.MessageParseError,
- # but other problems can occur in message parsing, e.g.
- # ValueError, and exceptions can occur in unpickling too.
- # We don't want the runner to die, so we just log and skip
- # this entry, but preserve it for analysis.
+ # This used to just catch email.Errors.MessageParseError, but
+ # other problems can occur in message parsing, e.g.
+ # ValueError, and exceptions can occur in unpickling too. We
+ # don't want the runner to die, so we just log and skip this
+ # entry, but preserve it for analysis.
self._log(e)
elog.error('Skipping and preserving unparseable message: %s',
filebase)
@@ -286,7 +309,7 @@ class Runner:
continue
try:
dlog.debug('[%s] processing onefile', me)
- self._onefile(msg, msgdata)
+ self._process_one_file(msg, msgdata)
dlog.debug('[%s] finishing filebase: %s', me, filebase)
self._switchboard.finish(filebase)
except Exception, e:
@@ -297,7 +320,7 @@ class Runner:
# cause the message to be stored in the shunt queue for human
# intervention.
self._log(e)
- # Put a marker in the metadata for unshunting
+ # Put a marker in the metadata for unshunting.
msgdata['whichq'] = self._switchboard.queue_directory
# It is possible that shunting can throw an exception, e.g. a
# permissions problem or a MemoryError due to a really large
@@ -317,12 +340,10 @@ class Runner:
self._switchboard.finish(filebase, preserve=True)
config.db.abort()
# Other work we want to do each time through the loop.
- dlog.debug('[%s] reaping', me)
- Utils.reap(self._kids, once=True)
dlog.debug('[%s] doing periodic', me)
- self._doperiodic()
+ self._do_periodic()
dlog.debug('[%s] checking short circuit', me)
- if self._shortcircuit():
+ if self._short_curcuit():
dlog.debug('[%s] short circuiting', me)
break
dlog.debug('[%s] commiting', me)
@@ -330,43 +351,36 @@ class Runner:
dlog.debug('[%s] ending oneloop: %s', me, len(files))
return len(files)
- def _onefile(self, msg, msgdata):
+ def _process_one_file(self, msg, msgdata):
+ """See `IRunner`."""
# Do some common sanity checking on the message metadata. It's got to
# be destined for a particular mailing list. This switchboard is used
# to shunt off badly formatted messages. We don't want to just trash
# them because they may be fixable with human intervention. Just get
- # them out of our site though.
+ # them out of our sight.
#
# Find out which mailing list this message is destined for.
listname = msgdata.get('listname')
mlist = config.db.list_manager.get(listname)
- if not mlist:
+ if mlist is None:
elog.error('Dequeuing message destined for missing list: %s',
listname)
self._shunt.enqueue(msg, msgdata)
return
- # Now process this message, keeping track of any subprocesses that may
- # have been spawned. We'll reap those later.
- #
- # We also want to set up the language context for this message. The
- # context will be the preferred language for the user if a member of
- # the list, or the list's preferred language. However, we must take
+ # Now process this message. We also want to set up the language
+ # context for this message. The context will be the preferred
+ # language for the user if the sender is a member of the list, or it
+ # will be the list's preferred language. However, we must take
# special care to reset the defaults, otherwise subsequent messages
- # may be translated incorrectly. BAW: I'm not sure I like this
- # approach, but I can't think of anything better right now.
+ # may be translated incorrectly.
sender = msg.get_sender()
member = mlist.members.get_member(sender)
- if member:
- lang = member.preferred_language
- else:
- lang = mlist.preferred_language
- with i18n.using_language(lang):
- msgdata['lang'] = lang
+ language = (member.preferred_language
+ if member is not None
+ else mlist.preferred_language)
+ with i18n.using_language(language):
+ msgdata['lang'] = language
keepqueued = self._dispose(mlist, msg, msgdata)
- # Keep tabs on any child processes that got spawned.
- kids = msgdata.get('_kids')
- if kids:
- self._kids.update(kids)
if keepqueued:
self._switchboard.enqueue(msg, msgdata)
@@ -376,60 +390,23 @@ class Runner:
traceback.print_exc(file=s)
elog.error('%s', s.getvalue())
- #
- # Subclasses can override these methods.
- #
- def _cleanup(self):
- """Clean up upon exit from the main processing loop.
-
- Called when the Runner's main loop is stopped, this should perform
- any necessary resource deallocation. Its return value is irrelevant.
- """
- Utils.reap(self._kids)
+ def _clean_up(self):
+ """See `IRunner`."""
def _dispose(self, mlist, msg, msgdata):
- """Dispose of a single message destined for a mailing list.
-
- Called for each message that the Runner is responsible for, this is
- the primary overridable method for processing each message.
- Subclasses, must provide implementation for this method.
-
- mlist is the IMailingList instance this message is destined for.
-
- msg is the Message object representing the message.
-
- msgdata is a dictionary of message metadata.
- """
+ """See `IRunner`."""
raise NotImplementedError
- def _doperiodic(self):
- """Do some processing `every once in a while'.
-
- Called every once in a while both from the Runner's main loop, and
- from the Runner's hash slice processing loop. You can do whatever
- special periodic processing you want here, and the return value is
- irrelevant.
- """
+ def _do_periodic(self):
+ """See `IRunner`."""
pass
def _snooze(self, filecnt):
- """Sleep for a little while.
-
- filecnt is the number of messages in the queue the last time through.
- Sub-runners can decide to continue to do work, or sleep for a while
- based on this value. By default, we only snooze if there was nothing
- to do last time around.
- """
+ """See `IRunner`."""
if filecnt or float(self.SLEEPTIME) <= 0:
return
time.sleep(float(self.SLEEPTIME))
- def _shortcircuit(self):
- """Return a true value if the individual file processing loop should
- exit before it's finished processing each message in the current slice
- of hash space. A false value tells _oneloop() to continue processing
- until the current snapshot of hash space is exhausted.
-
- You could, for example, implement a throttling algorithm here.
- """
+ def _short_curcuit(self):
+ """See `IRunner`."""
return self._stop
diff --git a/mailman/queue/bounce.py b/mailman/queue/bounce.py
index cfd86be60..0c5788174 100644
--- a/mailman/queue/bounce.py
+++ b/mailman/queue/bounce.py
@@ -51,7 +51,7 @@ class BounceMixin:
#
# today is itself a 3-tuple of (year, month, day)
#
- # Every once in a while (see _doperiodic()), the bounce runner cracks
+ # Every once in a while (see _do_periodic()), the bounce runner cracks
# open the file, reads all the records and registers all the bounces.
# Then it truncates the file and continues on. We don't need to lock
# the bounce event file because bounce qrunners are single threaded
@@ -123,11 +123,11 @@ class BounceMixin:
os.unlink(self._bounce_events_file)
self._bouncecnt = 0
- def _cleanup(self):
+ def _clean_up(self):
if self._bouncecnt > 0:
self._register_bounces()
- def _doperiodic(self):
+ def _do_periodic(self):
now = datetime.datetime.now()
if self._nextaction > now or self._bouncecnt == 0:
return
@@ -218,11 +218,11 @@ class BounceRunner(Runner, BounceMixin):
addrs = filter(None, addrs)
self._queue_bounces(mlist.fqdn_listname, addrs, msg)
- _doperiodic = BounceMixin._doperiodic
+ _do_periodic = BounceMixin._do_periodic
- def _cleanup(self):
- BounceMixin._cleanup(self)
- Runner._cleanup(self)
+ def _clean_up(self):
+ BounceMixin._clean_up(self)
+ Runner._clean_up(self)
diff --git a/mailman/queue/docs/runner.txt b/mailman/queue/docs/runner.txt
index e95e20ecd..a9e17370b 100644
--- a/mailman/queue/docs/runner.txt
+++ b/mailman/queue/docs/runner.txt
@@ -34,7 +34,7 @@ This is about as simple as a qrunner can be.
... self.msgdata = msgdata
... return False
...
- ... def _doperiodic(self):
+ ... def _do_periodic(self):
... self.stop()
...
... def _snooze(self, filecnt):
diff --git a/mailman/queue/http.py b/mailman/queue/http.py
index bbbef51df..cb6940b29 100644
--- a/mailman/queue/http.py
+++ b/mailman/queue/http.py
@@ -37,7 +37,7 @@ class HTTPRunner(Runner):
def __init__(self, slice=None, numslices=1):
pass
- def _cleanup(self):
+ def _clean_up(self):
pass
diff --git a/mailman/queue/maildir.py b/mailman/queue/maildir.py
index 4a6c0f250..71bac67dc 100644
--- a/mailman/queue/maildir.py
+++ b/mailman/queue/maildir.py
@@ -98,7 +98,7 @@ class MaildirRunner(Runner):
self._cur = os.path.join(config.MAILDIR_DIR, 'cur')
self._parser = Parser(Message)
- def _oneloop(self):
+ def _one_iteration(self):
# Refresh this each time through the list.
listnames = list(config.list_manager.names)
# Cruise through all the files currently in the new/ directory
@@ -185,5 +185,5 @@ class MaildirRunner(Runner):
os.rename(dstname, xdstname)
log.error('%s', e)
- def _cleanup(self):
+ def _clean_up(self):
pass
diff --git a/mailman/queue/outgoing.py b/mailman/queue/outgoing.py
index 624368930..3ab67eaad 100644
--- a/mailman/queue/outgoing.py
+++ b/mailman/queue/outgoing.py
@@ -32,8 +32,8 @@ from mailman.core import errors
from mailman.queue import Runner, Switchboard
from mailman.queue.bounce import BounceMixin
-# This controls how often _doperiodic() will try to deal with deferred
-# permanent failures. It is a count of calls to _doperiodic()
+# This controls how often _do_periodic() will try to deal with deferred
+# permanent failures. It is a count of calls to _do_periodic()
DEAL_WITH_PERMFAILURES_EVERY = 10
log = logging.getLogger('mailman.error')
@@ -123,8 +123,8 @@ class OutgoingRunner(Runner, BounceMixin):
# We've successfully completed handling of this message
return False
- _doperiodic = BounceMixin._doperiodic
+ _do_periodic = BounceMixin._do_periodic
- def _cleanup(self):
- BounceMixin._cleanup(self)
- Runner._cleanup(self)
+ def _clean_up(self):
+ BounceMixin._clean_up(self)
+ Runner._clean_up(self)
diff --git a/mailman/testing/helpers.py b/mailman/testing/helpers.py
index dd241ddba..5bd019ab7 100644
--- a/mailman/testing/helpers.py
+++ b/mailman/testing/helpers.py
@@ -63,7 +63,7 @@ def make_testable_runner(runner_class):
class EmptyingRunner(runner_class):
"""Stop processing when the queue is empty."""
- def _doperiodic(self):
+ def _do_periodic(self):
"""Stop when the queue is empty."""
self._stop = (len(self._switchboard.files) == 0)