diff options
| author | bwarsaw | 2006-07-16 21:54:12 +0000 |
|---|---|---|
| committer | bwarsaw | 2006-07-16 21:54:12 +0000 |
| commit | 2863de361a5a597006d7ce57dc58f9c538021855 (patch) | |
| tree | ee552e311c055eb336c11e9dc782ec1112ff4220 /Mailman/Queue | |
| parent | 337beba04cf052acb9f33cc4f1862b2841924973 (diff) | |
| download | mailman-2863de361a5a597006d7ce57dc58f9c538021855.tar.gz mailman-2863de361a5a597006d7ce57dc58f9c538021855.tar.zst mailman-2863de361a5a597006d7ce57dc58f9c538021855.zip | |
Added robustness to Switchboards and Runners so that if a runner crashes
uncleanly (e.g. segfaults the Python interpreter), messages being processed
will not be lost.
The vulnerability, ideas, and patches are credited to Richard Barrett and Mark
Sapiro. Their original work was modified by Barry for this commit and any
bugs are his fault.
The basic idea is that instead of unlinking a .pck file in dequeue(), the file
is renamed to a .bak file. The Switchboard grows a finish() method which then
unlinks the .bak file. That class's constructor also grows a 'restore'
argument (defaulting to false), which when true moves all .bak files it finds
in its hash space to .pck, thereby restoring a file lost while "in flight".
This relies on the fact that even with multiple qrunners, exactly one process
will be responsible for one hash space slice, so it's never possible (under
normal operation) for a .bak file to be renamed to .pck by some other process.
Test cases for both the new Switchboard behavior and the use of that by Runner
subclasses has been added.
There are two things to watch out for, either of which may require some
additional changes. There is some small potential to duplicate messages in
various queues, if say 'mailmanctl' were improperly started more than once by
a site admin. This usually won't happen unless an admin is overly eager with
the mailmanctl -s switch, so we can chalk this one up to operator error. I'm
not sure what more we can do about that.
There's also a possibility that if we're processing a message that continually
causes the Python interpreter to crash, we could end up duplicating messages
endlessly. This is especially troublesome for the Outgoing runner which could
conceivably cause a mail flood. I consider this the more critical issue to
defend against, probably by adding a numbering scheme to the .bak file names
and refusing to restore a .bak file more than say 3 times without human
intervention.
Diffstat (limited to 'Mailman/Queue')
| -rw-r--r-- | Mailman/Queue/Makefile.in | 21 | ||||
| -rw-r--r-- | Mailman/Queue/Runner.py | 10 | ||||
| -rw-r--r-- | Mailman/Queue/Switchboard.py | 38 | ||||
| -rw-r--r-- | Mailman/Queue/tests/Makefile.in | 71 | ||||
| -rw-r--r-- | Mailman/Queue/tests/__init__.py | 0 | ||||
| -rw-r--r-- | Mailman/Queue/tests/test_runners.py | 236 |
6 files changed, 366 insertions, 10 deletions
diff --git a/Mailman/Queue/Makefile.in b/Mailman/Queue/Makefile.in index 179f3d97b..99e74abea 100644 --- a/Mailman/Queue/Makefile.in +++ b/Mailman/Queue/Makefile.in @@ -41,6 +41,7 @@ PACKAGEDIR= $(prefix)/Mailman/Queue SHELL= /bin/sh MODULES= *.py +SUBDIRS= tests # Modes for directories and executables created by the install # process. Default to group-writable directories but @@ -54,17 +55,37 @@ INSTALL_PROGRAM=$(INSTALL) -m $(EXEMODE) # Rules all: + for d in $(SUBDIRS); \ + do \ + (cd $$d; $(MAKE)); \ + done install: for f in $(MODULES); \ do \ $(INSTALL) -m $(FILEMODE) $(srcdir)/$$f $(DESTDIR)$(PACKAGEDIR); \ done + for d in $(SUBDIRS); \ + do \ + (cd $$d; $(MAKE) DESTDIR=$(DESTDIR) install); \ + done finish: + @for d in $(SUBDIRS); \ + do \ + (cd $$d; $(MAKE) DESTDIR=$(DESTDIR) finish); \ + done clean: + for d in $(SUBDIRS); \ + do \ + (cd $$d; $(MAKE) clean); \ + done distclean: -rm *.pyc -rm Makefile + for d in $(SUBDIRS); \ + do \ + (cd $$d; $(MAKE) distclean); \ + done diff --git a/Mailman/Queue/Runner.py b/Mailman/Queue/Runner.py index bdb811a3a..263940135 100644 --- a/Mailman/Queue/Runner.py +++ b/Mailman/Queue/Runner.py @@ -19,8 +19,8 @@ import time import weakref -import traceback import logging +import traceback import email.Errors from cStringIO import StringIO @@ -44,7 +44,7 @@ class Runner: self._kids = {} # Create our own switchboard. Don't use the switchboard cache because # we want to provide slice and numslice arguments. - self._switchboard = Switchboard(self.QDIR, slice, numslices) + self._switchboard = Switchboard(self.QDIR, slice, numslices, True) # Create the shunt switchboard self._shunt = Switchboard(config.SHUNTQUEUE_DIR) self._stop = False @@ -104,6 +104,7 @@ class Runner: continue try: self._onefile(msg, msgdata) + self._switchboard.finish(filebase) except Exception, e: # All runners that implement _dispose() must guarantee that # exceptions are caught and dealt with properly. Still, there @@ -114,8 +115,9 @@ class Runner: self._log(e) # Put a marker in the metadata for unshunting msgdata['whichq'] = self._switchboard.whichq() - filebase = self._shunt.enqueue(msg, msgdata) - log.error('SHUNTING: %s', filebase) + new_filebase = self._shunt.enqueue(msg, msgdata) + log.error('SHUNTING: %s', new_filebase) + self._switchboard.finish(filebase) # Other work we want to do each time through the loop Utils.reap(self._kids, once=True) self._doperiodic() diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py index 4741580e7..028eeb6ee 100644 --- a/Mailman/Queue/Switchboard.py +++ b/Mailman/Queue/Switchboard.py @@ -39,6 +39,7 @@ import time import email import errno import cPickle +import logging import marshal from Mailman import Message @@ -59,7 +60,7 @@ DELTA = .0001 class Switchboard: - def __init__(self, whichq, slice=None, numslices=1): + def __init__(self, whichq, slice=None, numslices=1, recover=False): self.__whichq = whichq # Create the directory if it doesn't yet exist. # FIXME @@ -78,6 +79,8 @@ class Switchboard: if numslices <> 1: self.__lower = ((shamax+1) * slice) / numslices self.__upper = (((shamax+1) * (slice+1)) / numslices) - 1 + if recover: + self.recover_backup_files() def whichq(self): return self.__whichq @@ -135,9 +138,16 @@ class Switchboard: def dequeue(self, filebase): # Calculate the filename from the given filebase. filename = os.path.join(self.__whichq, filebase + '.pck') + backfile = os.path.join(self.__whichq, filebase + '.bak') # Read the message object and metadata. fp = open(filename) - os.unlink(filename) + # Move the file to the backup file name for processing. If this + # process crashes uncleanly the .bak file will be used to re-instate + # the .pck file in order to try again. XXX what if something caused + # Python to constantly crash? Is it possible that we'd end up mail + # bombing recipients or crushing the archiver? How would we defend + # against that? + os.rename(filename, backfile) try: msg = cPickle.load(fp) data = cPickle.load(fp) @@ -147,26 +157,42 @@ class Switchboard: msg = email.message_from_string(msg, Message.Message) return msg, data - def files(self): + def finish(self, filebase): + bakfile = os.path.join(self.__whichq, filebase + '.bak') + try: + os.unlink(bakfile) + except EnvironmentError, e: + log.exception('Failed to unlink backup file: %s', bakfile) + + def files(self, extension='.pck'): times = {} lower = self.__lower upper = self.__upper for f in os.listdir(self.__whichq): # By ignoring anything that doesn't end in .pck, we ignore # tempfiles and avoid a race condition. - if not f.endswith('.pck'): + filebase, ext = os.path.splitext(f) + if ext <> extension: continue - filebase = os.path.splitext(f)[0] when, digest = filebase.split('+') # Throw out any files which don't match our bitrange. BAW: test # performance and end-cases of this algorithm. MAS: both # comparisons need to be <= to get complete range. if lower is None or (lower <= long(digest, 16) <= upper): key = float(when) - while times.has_key(key): + while key in times: key += DELTA times[key] = filebase # FIFO sort keys = times.keys() keys.sort() return [times[k] for k in keys] + + def recover_backup_files(self): + # Move all .bak files in our slice to .pck. It's impossible for both + # to exist at the same time, so the move is enough to ensure that our + # normal dequeuing process will handle them. + for filebase in self.files('.bak'): + src = os.path.join(self.__whichq, filebase + '.bak') + dst = os.path.join(self.__whichq, filebase + '.pck') + os.rename(src, dst) diff --git a/Mailman/Queue/tests/Makefile.in b/Mailman/Queue/tests/Makefile.in new file mode 100644 index 000000000..fdeae5d86 --- /dev/null +++ b/Mailman/Queue/tests/Makefile.in @@ -0,0 +1,71 @@ +# Copyright (C) 2006 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. + +# NOTE: Makefile.in is converted into Makefile by the configure script +# in the parent directory. Once configure has run, you can recreate +# the Makefile by running just config.status. + +# Variables set by configure + +VPATH= @srcdir@ +srcdir= @srcdir@ +bindir= @bindir@ +prefix= @prefix@ +exec_prefix= @exec_prefix@ +DESTDIR= + +CC= @CC@ +CHMOD= @CHMOD@ +INSTALL= @INSTALL@ + +DEFS= @DEFS@ + +# Customizable but not set by configure + +OPT= @OPT@ +CFLAGS= $(OPT) $(DEFS) +PACKAGEDIR= $(prefix)/Mailman/Queue/tests +SHELL= /bin/sh + +MODULES= *.py + +# Modes for directories and executables created by the install +# process. Default to group-writable directories but +# user-only-writable for executables. +DIRMODE= 775 +EXEMODE= 755 +FILEMODE= 644 +INSTALL_PROGRAM=$(INSTALL) -m $(EXEMODE) + + +# Rules + +all: + +install: + for f in $(MODULES); \ + do \ + $(INSTALL) -m $(FILEMODE) $(srcdir)/$$f $(DESTDIR)$(PACKAGEDIR); \ + done + +finish: + +clean: + +distclean: + -rm *.pyc + -rm Makefile diff --git a/Mailman/Queue/tests/__init__.py b/Mailman/Queue/tests/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/Mailman/Queue/tests/__init__.py diff --git a/Mailman/Queue/tests/test_runners.py b/Mailman/Queue/tests/test_runners.py new file mode 100644 index 000000000..48dd15db2 --- /dev/null +++ b/Mailman/Queue/tests/test_runners.py @@ -0,0 +1,236 @@ +# Copyright (C) 2001-2006 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. + +"""Unit tests for the various Mailman qrunner modules.""" + +import os +import email +import shutil +import tempfile +import unittest + +from Mailman.Message import Message +from Mailman.Queue.NewsRunner import prepare_message +from Mailman.Queue.Runner import Runner +from Mailman.Queue.Switchboard import Switchboard +from Mailman.testing.base import TestBase + + + +class TestPrepMessage(TestBase): + def test_remove_unacceptables(self): + eq = self.assertEqual + msg = email.message_from_string("""\ +From: aperson@dom.ain +To: _xtest@dom.ain +NNTP-Posting-Host: news.dom.ain +NNTP-Posting-Date: today +X-Trace: blah blah +X-Complaints-To: abuse@dom.ain +Xref: blah blah +Xref: blah blah +Date-Received: yesterday +Posted: tomorrow +Posting-Version: 99.99 +Relay-Version: 88.88 +Received: blah blah + +A message +""") + msgdata = {} + prepare_message(self._mlist, msg, msgdata) + eq(msgdata.get('prepped'), 1) + eq(msg['from'], 'aperson@dom.ain') + eq(msg['to'], '_xtest@dom.ain') + eq(msg['nntp-posting-host'], None) + eq(msg['nntp-posting-date'], None) + eq(msg['x-trace'], None) + eq(msg['x-complaints-to'], None) + eq(msg['xref'], None) + eq(msg['date-received'], None) + eq(msg['posted'], None) + eq(msg['posting-version'], None) + eq(msg['relay-version'], None) + eq(msg['received'], None) + + def test_munge_duplicates_no_duplicates(self): + eq = self.assertEqual + msg = email.message_from_string("""\ +From: aperson@dom.ain +To: _xtest@dom.ain +Cc: someother@dom.ain +Content-Transfer-Encoding: yes + +A message +""") + msgdata = {} + prepare_message(self._mlist, msg, msgdata) + eq(msgdata.get('prepped'), 1) + eq(msg['from'], 'aperson@dom.ain') + eq(msg['to'], '_xtest@dom.ain') + eq(msg['cc'], 'someother@dom.ain') + eq(msg['content-transfer-encoding'], 'yes') + + def test_munge_duplicates(self): + eq = self.assertEqual + msg = email.message_from_string("""\ +From: aperson@dom.ain +To: _xtest@dom.ain +To: two@dom.ain +Cc: three@dom.ain +Cc: four@dom.ain +Cc: five@dom.ain +Content-Transfer-Encoding: yes +Content-Transfer-Encoding: no +Content-Transfer-Encoding: maybe + +A message +""") + msgdata = {} + prepare_message(self._mlist, msg, msgdata) + eq(msgdata.get('prepped'), 1) + eq(msg.get_all('from'), ['aperson@dom.ain']) + eq(msg.get_all('to'), ['_xtest@dom.ain']) + eq(msg.get_all('cc'), ['three@dom.ain']) + eq(msg.get_all('content-transfer-encoding'), ['yes']) + eq(msg.get_all('x-original-to'), ['two@dom.ain']) + eq(msg.get_all('x-original-cc'), ['four@dom.ain', 'five@dom.ain']) + eq(msg.get_all('x-original-content-transfer-encoding'), + ['no', 'maybe']) + + + +class TestSwitchboard(TestBase): + def setUp(self): + TestBase.setUp(self) + self._tmpdir = tempfile.mkdtemp() + self._msg = email.message_from_string("""\ +From: aperson@dom.ain +To: _xtest@dom.ain + +A test message. +""") + self._sb = Switchboard(self._tmpdir) + + def tearDown(self): + shutil.rmtree(self._tmpdir, True) + TestBase.tearDown(self) + + def _count_qfiles(self): + files = {} + for qfile in os.listdir(self._tmpdir): + root, ext = os.path.splitext(qfile) + files[ext] = files.get(ext, 0) + 1 + return files + + def test_whichq(self): + self.assertEqual(self._sb.whichq(), self._tmpdir) + + def test_enqueue(self): + eq = self.assertEqual + self._sb.enqueue(self._msg, listname='_xtest@dom.ain') + files = self._count_qfiles() + eq(len(files), 1) + eq(files['.pck'], 1) + + def test_dequeue(self): + eq = self.assertEqual + self._sb.enqueue(self._msg, listname='_xtest@dom.ain', foo='yes') + count = 0 + for filebase in self._sb.files(): + msg, data = self._sb.dequeue(filebase) + self._sb.finish(filebase) + count += 1 + eq(msg['from'], self._msg['from']) + eq(msg['to'], self._msg['to']) + eq(data['foo'], 'yes') + eq(count, 1) + files = self._count_qfiles() + eq(len(files), 0) + + def test_bakfile(self): + eq = self.assertEqual + self._sb.enqueue(self._msg, listname='_xtest@dom.ain') + self._sb.enqueue(self._msg, listname='_xtest@dom.ain') + self._sb.enqueue(self._msg, listname='_xtest@dom.ain') + for filebase in self._sb.files(): + self._sb.dequeue(filebase) + files = self._count_qfiles() + eq(len(files), 1) + eq(files['.bak'], 3) + + def test_recover(self): + eq = self.assertEqual + self._sb.enqueue(self._msg, listname='_xtest@dom.ain') + for filebase in self._sb.files(): + self._sb.dequeue(filebase) + # Not calling sb.finish() leaves .bak files + sb2 = Switchboard(self._tmpdir, recover=True) + files = self._count_qfiles() + eq(len(files), 1) + eq(files['.pck'], 1) + + + +class TestableRunner(Runner): + def _dispose(self, mlist, msg, msgdata): + self.msg = msg + self.data = msgdata + return False + + def _doperiodic(self): + self.stop() + + def _snooze(self, filecnt): + return + + +class TestRunner(TestBase): + def setUp(self): + TestBase.setUp(self) + self._tmpdir = tempfile.mkdtemp() + self._msg = email.message_from_string("""\ +From: aperson@dom.ain +To: _xtest@dom.ain + +A test message. +""", Message) + class MyRunner(TestableRunner): + QDIR = self._tmpdir + self._runner = MyRunner() + + def tearDown(self): + shutil.rmtree(self._tmpdir, True) + TestBase.tearDown(self) + + def test_run_loop(self): + eq = self.assertEqual + sb = Switchboard(self._tmpdir) + sb.enqueue(self._msg, listname='_xtest@example.com', foo='yes') + self._runner.run() + eq(self._runner.msg['from'], self._msg['from']) + eq(self._runner.msg['to'], self._msg['to']) + eq(self._runner.data['foo'], 'yes') + + + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(TestPrepMessage)) + suite.addTest(unittest.makeSuite(TestSwitchboard)) + suite.addTest(unittest.makeSuite(TestRunner)) + return suite |
