summaryrefslogtreecommitdiff
path: root/mailman/queue/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'mailman/queue/__init__.py')
-rw-r--r--mailman/queue/__init__.py187
1 files changed, 82 insertions, 105 deletions
diff --git a/mailman/queue/__init__.py b/mailman/queue/__init__.py
index 351fa0bd2..290f53978 100644
--- a/mailman/queue/__init__.py
+++ b/mailman/queue/__init__.py
@@ -69,6 +69,19 @@ class Switchboard:
implements(ISwitchboard)
def __init__(self, whichq, slice=None, numslices=1, recover=False):
+ """Create a switchboard object.
+
+ :param whichq: The queue directory.
+ :type whichq: str
+ :param slice: The slice number for this switchboard, or None. If not
+ None, it must be [0..`numslices`).
+ :type slice: int or None
+ :param numslices: The total number of slices to split this queue
+ directory into. It must be a power of 2.
+ :type numslices: int
+ :param recover: True if backup files should be recovered.
+ :type recover: bool
+ """
self._whichq = whichq
# Create the directory if it doesn't yet exist.
Utils.makedirs(self._whichq, 0770)
@@ -84,9 +97,11 @@ class Switchboard:
@property
def queue_directory(self):
+ """See `ISwitchboard`."""
return self._whichq
def enqueue(self, _msg, _metadata=None, **_kws):
+ """See `ISwitchboard`."""
if _metadata is None:
_metadata = {}
# Calculate the SHA hexdigest of the message to get a unique base
@@ -133,6 +148,7 @@ class Switchboard:
return filebase
def dequeue(self, filebase):
+ """See `ISwitchboard`."""
# Calculate the filename from the given filebase.
filename = os.path.join(self._whichq, filebase + '.pck')
backfile = os.path.join(self._whichq, filebase + '.bak')
@@ -174,9 +190,11 @@ class Switchboard:
@property
def files(self):
+ """See `ISwitchboard`."""
return self.get_files()
def get_files(self, extension='.pck'):
+ """See `ISwitchboard`."""
times = {}
lower = self._lower
upper = self._upper
@@ -199,6 +217,7 @@ class Switchboard:
return [times[key] for key in sorted(times)]
def recover_backup_files(self):
+ """See `ISwitchboard`."""
# Move all .bak files in our slice to .pck. It's impossible for both
# to exist at the same time, so the move is enough to ensure that our
# normal dequeuing process will handle them.
@@ -216,7 +235,15 @@ class Runner:
SLEEPTIME = None
def __init__(self, slice=None, numslices=1):
- self._kids = {}
+ """Create a queue runner.
+
+ :param slice: The slice number for this queue runner. This is passed
+ directly to the underlying `ISwitchboard` object.
+ :type slice: int or None
+ :param numslices: The number of slices for this queue. Must be a
+ power of 2.
+ :type numslices: int
+ """
# Create our own switchboard. Don't use the switchboard cache because
# we want to provide slice and numslice arguments.
self._switchboard = Switchboard(self.QDIR, slice, numslices, True)
@@ -230,54 +257,50 @@ class Runner:
return '<%s at %s>' % (self.__class__.__name__, id(self))
def stop(self):
+ """See `IRunner`."""
self._stop = True
def run(self):
+ """See `IRunner`."""
# Start the main loop for this queue runner.
try:
- try:
- while True:
- # Once through the loop that processes all the files in
- # the queue directory.
- filecnt = self._oneloop()
- # Do the periodic work for the subclass. BAW: this
- # shouldn't be called here. There should be one more
- # _doperiodic() call at the end of the _oneloop() loop.
- self._doperiodic()
- # If the stop flag is set, we're done.
- if self._stop:
- break
- # Give the runner an opportunity to snooze for a while,
- # but pass it the file count so it can decide whether to
- # do more work now or not.
- self._snooze(filecnt)
- except KeyboardInterrupt:
- pass
+ while True:
+ # Once through the loop that processes all the files in the
+ # queue directory.
+ filecnt = self._one_iteration()
+ # Do the periodic work for the subclass.
+ self._do_periodic()
+ # If the stop flag is set, we're done.
+ if self._stop:
+ break
+ # Give the runner an opportunity to snooze for a while, but
+ # pass it the file count so it can decide whether to do more
+ # work now or not.
+ self._snooze(filecnt)
+ except KeyboardInterrupt:
+ pass
finally:
- # We've broken out of our main loop, so we want to reap all the
- # subprocesses we've created and do any other necessary cleanups.
- self._cleanup()
+ self._clean_up()
- def _oneloop(self):
+ def _one_iteration(self):
+ """See `IRunner`."""
me = self.__class__.__name__
dlog.debug('[%s] starting oneloop', me)
- # First, list all the files in our queue directory.
- # Switchboard.files() is guaranteed to hand us the files in FIFO
- # order. Return an integer count of the number of files that were
- # available for this qrunner to process.
+ # List all the files in our queue directory. The switchboard is
+ # guaranteed to hand us the files in FIFO order.
files = self._switchboard.files
for filebase in files:
dlog.debug('[%s] processing filebase: %s', me, filebase)
try:
# Ask the switchboard for the message and metadata objects
- # associated with this filebase.
+ # associated with this queue file.
msg, msgdata = self._switchboard.dequeue(filebase)
except Exception, e:
- # This used to just catch email.Errors.MessageParseError,
- # but other problems can occur in message parsing, e.g.
- # ValueError, and exceptions can occur in unpickling too.
- # We don't want the runner to die, so we just log and skip
- # this entry, but preserve it for analysis.
+ # This used to just catch email.Errors.MessageParseError, but
+ # other problems can occur in message parsing, e.g.
+ # ValueError, and exceptions can occur in unpickling too. We
+ # don't want the runner to die, so we just log and skip this
+ # entry, but preserve it for analysis.
self._log(e)
elog.error('Skipping and preserving unparseable message: %s',
filebase)
@@ -286,7 +309,7 @@ class Runner:
continue
try:
dlog.debug('[%s] processing onefile', me)
- self._onefile(msg, msgdata)
+ self._process_one_file(msg, msgdata)
dlog.debug('[%s] finishing filebase: %s', me, filebase)
self._switchboard.finish(filebase)
except Exception, e:
@@ -297,7 +320,7 @@ class Runner:
# cause the message to be stored in the shunt queue for human
# intervention.
self._log(e)
- # Put a marker in the metadata for unshunting
+ # Put a marker in the metadata for unshunting.
msgdata['whichq'] = self._switchboard.queue_directory
# It is possible that shunting can throw an exception, e.g. a
# permissions problem or a MemoryError due to a really large
@@ -317,12 +340,10 @@ class Runner:
self._switchboard.finish(filebase, preserve=True)
config.db.abort()
# Other work we want to do each time through the loop.
- dlog.debug('[%s] reaping', me)
- Utils.reap(self._kids, once=True)
dlog.debug('[%s] doing periodic', me)
- self._doperiodic()
+ self._do_periodic()
dlog.debug('[%s] checking short circuit', me)
- if self._shortcircuit():
+ if self._short_curcuit():
dlog.debug('[%s] short circuiting', me)
break
dlog.debug('[%s] commiting', me)
@@ -330,43 +351,36 @@ class Runner:
dlog.debug('[%s] ending oneloop: %s', me, len(files))
return len(files)
- def _onefile(self, msg, msgdata):
+ def _process_one_file(self, msg, msgdata):
+ """See `IRunner`."""
# Do some common sanity checking on the message metadata. It's got to
# be destined for a particular mailing list. This switchboard is used
# to shunt off badly formatted messages. We don't want to just trash
# them because they may be fixable with human intervention. Just get
- # them out of our site though.
+ # them out of our sight.
#
# Find out which mailing list this message is destined for.
listname = msgdata.get('listname')
mlist = config.db.list_manager.get(listname)
- if not mlist:
+ if mlist is None:
elog.error('Dequeuing message destined for missing list: %s',
listname)
self._shunt.enqueue(msg, msgdata)
return
- # Now process this message, keeping track of any subprocesses that may
- # have been spawned. We'll reap those later.
- #
- # We also want to set up the language context for this message. The
- # context will be the preferred language for the user if a member of
- # the list, or the list's preferred language. However, we must take
+ # Now process this message. We also want to set up the language
+ # context for this message. The context will be the preferred
+ # language for the user if the sender is a member of the list, or it
+ # will be the list's preferred language. However, we must take
# special care to reset the defaults, otherwise subsequent messages
- # may be translated incorrectly. BAW: I'm not sure I like this
- # approach, but I can't think of anything better right now.
+ # may be translated incorrectly.
sender = msg.get_sender()
member = mlist.members.get_member(sender)
- if member:
- lang = member.preferred_language
- else:
- lang = mlist.preferred_language
- with i18n.using_language(lang):
- msgdata['lang'] = lang
+ language = (member.preferred_language
+ if member is not None
+ else mlist.preferred_language)
+ with i18n.using_language(language):
+ msgdata['lang'] = language
keepqueued = self._dispose(mlist, msg, msgdata)
- # Keep tabs on any child processes that got spawned.
- kids = msgdata.get('_kids')
- if kids:
- self._kids.update(kids)
if keepqueued:
self._switchboard.enqueue(msg, msgdata)
@@ -376,60 +390,23 @@ class Runner:
traceback.print_exc(file=s)
elog.error('%s', s.getvalue())
- #
- # Subclasses can override these methods.
- #
- def _cleanup(self):
- """Clean up upon exit from the main processing loop.
-
- Called when the Runner's main loop is stopped, this should perform
- any necessary resource deallocation. Its return value is irrelevant.
- """
- Utils.reap(self._kids)
+ def _clean_up(self):
+ """See `IRunner`."""
def _dispose(self, mlist, msg, msgdata):
- """Dispose of a single message destined for a mailing list.
-
- Called for each message that the Runner is responsible for, this is
- the primary overridable method for processing each message.
- Subclasses, must provide implementation for this method.
-
- mlist is the IMailingList instance this message is destined for.
-
- msg is the Message object representing the message.
-
- msgdata is a dictionary of message metadata.
- """
+ """See `IRunner`."""
raise NotImplementedError
- def _doperiodic(self):
- """Do some processing `every once in a while'.
-
- Called every once in a while both from the Runner's main loop, and
- from the Runner's hash slice processing loop. You can do whatever
- special periodic processing you want here, and the return value is
- irrelevant.
- """
+ def _do_periodic(self):
+ """See `IRunner`."""
pass
def _snooze(self, filecnt):
- """Sleep for a little while.
-
- filecnt is the number of messages in the queue the last time through.
- Sub-runners can decide to continue to do work, or sleep for a while
- based on this value. By default, we only snooze if there was nothing
- to do last time around.
- """
+ """See `IRunner`."""
if filecnt or float(self.SLEEPTIME) <= 0:
return
time.sleep(float(self.SLEEPTIME))
- def _shortcircuit(self):
- """Return a true value if the individual file processing loop should
- exit before it's finished processing each message in the current slice
- of hash space. A false value tells _oneloop() to continue processing
- until the current snapshot of hash space is exhausted.
-
- You could, for example, implement a throttling algorithm here.
- """
+ def _short_curcuit(self):
+ """See `IRunner`."""
return self._stop