summaryrefslogtreecommitdiff
path: root/Mailman/Queue
diff options
context:
space:
mode:
authorbwarsaw2006-07-16 21:54:12 +0000
committerbwarsaw2006-07-16 21:54:12 +0000
commit2863de361a5a597006d7ce57dc58f9c538021855 (patch)
treeee552e311c055eb336c11e9dc782ec1112ff4220 /Mailman/Queue
parent337beba04cf052acb9f33cc4f1862b2841924973 (diff)
downloadmailman-2863de361a5a597006d7ce57dc58f9c538021855.tar.gz
mailman-2863de361a5a597006d7ce57dc58f9c538021855.tar.zst
mailman-2863de361a5a597006d7ce57dc58f9c538021855.zip
Added robustness to Switchboards and Runners so that if a runner crashes
uncleanly (e.g. segfaults the Python interpreter), messages being processed will not be lost. The vulnerability, ideas, and patches are credited to Richard Barrett and Mark Sapiro. Their original work was modified by Barry for this commit and any bugs are his fault. The basic idea is that instead of unlinking a .pck file in dequeue(), the file is renamed to a .bak file. The Switchboard grows a finish() method which then unlinks the .bak file. That class's constructor also grows a 'restore' argument (defaulting to false), which when true moves all .bak files it finds in its hash space to .pck, thereby restoring a file lost while "in flight". This relies on the fact that even with multiple qrunners, exactly one process will be responsible for one hash space slice, so it's never possible (under normal operation) for a .bak file to be renamed to .pck by some other process. Test cases for both the new Switchboard behavior and the use of that by Runner subclasses has been added. There are two things to watch out for, either of which may require some additional changes. There is some small potential to duplicate messages in various queues, if say 'mailmanctl' were improperly started more than once by a site admin. This usually won't happen unless an admin is overly eager with the mailmanctl -s switch, so we can chalk this one up to operator error. I'm not sure what more we can do about that. There's also a possibility that if we're processing a message that continually causes the Python interpreter to crash, we could end up duplicating messages endlessly. This is especially troublesome for the Outgoing runner which could conceivably cause a mail flood. I consider this the more critical issue to defend against, probably by adding a numbering scheme to the .bak file names and refusing to restore a .bak file more than say 3 times without human intervention.
Diffstat (limited to 'Mailman/Queue')
-rw-r--r--Mailman/Queue/Makefile.in21
-rw-r--r--Mailman/Queue/Runner.py10
-rw-r--r--Mailman/Queue/Switchboard.py38
-rw-r--r--Mailman/Queue/tests/Makefile.in71
-rw-r--r--Mailman/Queue/tests/__init__.py0
-rw-r--r--Mailman/Queue/tests/test_runners.py236
6 files changed, 366 insertions, 10 deletions
diff --git a/Mailman/Queue/Makefile.in b/Mailman/Queue/Makefile.in
index 179f3d97b..99e74abea 100644
--- a/Mailman/Queue/Makefile.in
+++ b/Mailman/Queue/Makefile.in
@@ -41,6 +41,7 @@ PACKAGEDIR= $(prefix)/Mailman/Queue
SHELL= /bin/sh
MODULES= *.py
+SUBDIRS= tests
# Modes for directories and executables created by the install
# process. Default to group-writable directories but
@@ -54,17 +55,37 @@ INSTALL_PROGRAM=$(INSTALL) -m $(EXEMODE)
# Rules
all:
+ for d in $(SUBDIRS); \
+ do \
+ (cd $$d; $(MAKE)); \
+ done
install:
for f in $(MODULES); \
do \
$(INSTALL) -m $(FILEMODE) $(srcdir)/$$f $(DESTDIR)$(PACKAGEDIR); \
done
+ for d in $(SUBDIRS); \
+ do \
+ (cd $$d; $(MAKE) DESTDIR=$(DESTDIR) install); \
+ done
finish:
+ @for d in $(SUBDIRS); \
+ do \
+ (cd $$d; $(MAKE) DESTDIR=$(DESTDIR) finish); \
+ done
clean:
+ for d in $(SUBDIRS); \
+ do \
+ (cd $$d; $(MAKE) clean); \
+ done
distclean:
-rm *.pyc
-rm Makefile
+ for d in $(SUBDIRS); \
+ do \
+ (cd $$d; $(MAKE) distclean); \
+ done
diff --git a/Mailman/Queue/Runner.py b/Mailman/Queue/Runner.py
index bdb811a3a..263940135 100644
--- a/Mailman/Queue/Runner.py
+++ b/Mailman/Queue/Runner.py
@@ -19,8 +19,8 @@
import time
import weakref
-import traceback
import logging
+import traceback
import email.Errors
from cStringIO import StringIO
@@ -44,7 +44,7 @@ class Runner:
self._kids = {}
# Create our own switchboard. Don't use the switchboard cache because
# we want to provide slice and numslice arguments.
- self._switchboard = Switchboard(self.QDIR, slice, numslices)
+ self._switchboard = Switchboard(self.QDIR, slice, numslices, True)
# Create the shunt switchboard
self._shunt = Switchboard(config.SHUNTQUEUE_DIR)
self._stop = False
@@ -104,6 +104,7 @@ class Runner:
continue
try:
self._onefile(msg, msgdata)
+ self._switchboard.finish(filebase)
except Exception, e:
# All runners that implement _dispose() must guarantee that
# exceptions are caught and dealt with properly. Still, there
@@ -114,8 +115,9 @@ class Runner:
self._log(e)
# Put a marker in the metadata for unshunting
msgdata['whichq'] = self._switchboard.whichq()
- filebase = self._shunt.enqueue(msg, msgdata)
- log.error('SHUNTING: %s', filebase)
+ new_filebase = self._shunt.enqueue(msg, msgdata)
+ log.error('SHUNTING: %s', new_filebase)
+ self._switchboard.finish(filebase)
# Other work we want to do each time through the loop
Utils.reap(self._kids, once=True)
self._doperiodic()
diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py
index 4741580e7..028eeb6ee 100644
--- a/Mailman/Queue/Switchboard.py
+++ b/Mailman/Queue/Switchboard.py
@@ -39,6 +39,7 @@ import time
import email
import errno
import cPickle
+import logging
import marshal
from Mailman import Message
@@ -59,7 +60,7 @@ DELTA = .0001
class Switchboard:
- def __init__(self, whichq, slice=None, numslices=1):
+ def __init__(self, whichq, slice=None, numslices=1, recover=False):
self.__whichq = whichq
# Create the directory if it doesn't yet exist.
# FIXME
@@ -78,6 +79,8 @@ class Switchboard:
if numslices <> 1:
self.__lower = ((shamax+1) * slice) / numslices
self.__upper = (((shamax+1) * (slice+1)) / numslices) - 1
+ if recover:
+ self.recover_backup_files()
def whichq(self):
return self.__whichq
@@ -135,9 +138,16 @@ class Switchboard:
def dequeue(self, filebase):
# Calculate the filename from the given filebase.
filename = os.path.join(self.__whichq, filebase + '.pck')
+ backfile = os.path.join(self.__whichq, filebase + '.bak')
# Read the message object and metadata.
fp = open(filename)
- os.unlink(filename)
+ # Move the file to the backup file name for processing. If this
+ # process crashes uncleanly the .bak file will be used to re-instate
+ # the .pck file in order to try again. XXX what if something caused
+ # Python to constantly crash? Is it possible that we'd end up mail
+ # bombing recipients or crushing the archiver? How would we defend
+ # against that?
+ os.rename(filename, backfile)
try:
msg = cPickle.load(fp)
data = cPickle.load(fp)
@@ -147,26 +157,42 @@ class Switchboard:
msg = email.message_from_string(msg, Message.Message)
return msg, data
- def files(self):
+ def finish(self, filebase):
+ bakfile = os.path.join(self.__whichq, filebase + '.bak')
+ try:
+ os.unlink(bakfile)
+ except EnvironmentError, e:
+ log.exception('Failed to unlink backup file: %s', bakfile)
+
+ def files(self, extension='.pck'):
times = {}
lower = self.__lower
upper = self.__upper
for f in os.listdir(self.__whichq):
# By ignoring anything that doesn't end in .pck, we ignore
# tempfiles and avoid a race condition.
- if not f.endswith('.pck'):
+ filebase, ext = os.path.splitext(f)
+ if ext <> extension:
continue
- filebase = os.path.splitext(f)[0]
when, digest = filebase.split('+')
# Throw out any files which don't match our bitrange. BAW: test
# performance and end-cases of this algorithm. MAS: both
# comparisons need to be <= to get complete range.
if lower is None or (lower <= long(digest, 16) <= upper):
key = float(when)
- while times.has_key(key):
+ while key in times:
key += DELTA
times[key] = filebase
# FIFO sort
keys = times.keys()
keys.sort()
return [times[k] for k in keys]
+
+ def recover_backup_files(self):
+ # Move all .bak files in our slice to .pck. It's impossible for both
+ # to exist at the same time, so the move is enough to ensure that our
+ # normal dequeuing process will handle them.
+ for filebase in self.files('.bak'):
+ src = os.path.join(self.__whichq, filebase + '.bak')
+ dst = os.path.join(self.__whichq, filebase + '.pck')
+ os.rename(src, dst)
diff --git a/Mailman/Queue/tests/Makefile.in b/Mailman/Queue/tests/Makefile.in
new file mode 100644
index 000000000..fdeae5d86
--- /dev/null
+++ b/Mailman/Queue/tests/Makefile.in
@@ -0,0 +1,71 @@
+# Copyright (C) 2006 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
+# USA.
+
+# NOTE: Makefile.in is converted into Makefile by the configure script
+# in the parent directory. Once configure has run, you can recreate
+# the Makefile by running just config.status.
+
+# Variables set by configure
+
+VPATH= @srcdir@
+srcdir= @srcdir@
+bindir= @bindir@
+prefix= @prefix@
+exec_prefix= @exec_prefix@
+DESTDIR=
+
+CC= @CC@
+CHMOD= @CHMOD@
+INSTALL= @INSTALL@
+
+DEFS= @DEFS@
+
+# Customizable but not set by configure
+
+OPT= @OPT@
+CFLAGS= $(OPT) $(DEFS)
+PACKAGEDIR= $(prefix)/Mailman/Queue/tests
+SHELL= /bin/sh
+
+MODULES= *.py
+
+# Modes for directories and executables created by the install
+# process. Default to group-writable directories but
+# user-only-writable for executables.
+DIRMODE= 775
+EXEMODE= 755
+FILEMODE= 644
+INSTALL_PROGRAM=$(INSTALL) -m $(EXEMODE)
+
+
+# Rules
+
+all:
+
+install:
+ for f in $(MODULES); \
+ do \
+ $(INSTALL) -m $(FILEMODE) $(srcdir)/$$f $(DESTDIR)$(PACKAGEDIR); \
+ done
+
+finish:
+
+clean:
+
+distclean:
+ -rm *.pyc
+ -rm Makefile
diff --git a/Mailman/Queue/tests/__init__.py b/Mailman/Queue/tests/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/Mailman/Queue/tests/__init__.py
diff --git a/Mailman/Queue/tests/test_runners.py b/Mailman/Queue/tests/test_runners.py
new file mode 100644
index 000000000..48dd15db2
--- /dev/null
+++ b/Mailman/Queue/tests/test_runners.py
@@ -0,0 +1,236 @@
+# Copyright (C) 2001-2006 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
+# USA.
+
+"""Unit tests for the various Mailman qrunner modules."""
+
+import os
+import email
+import shutil
+import tempfile
+import unittest
+
+from Mailman.Message import Message
+from Mailman.Queue.NewsRunner import prepare_message
+from Mailman.Queue.Runner import Runner
+from Mailman.Queue.Switchboard import Switchboard
+from Mailman.testing.base import TestBase
+
+
+
+class TestPrepMessage(TestBase):
+ def test_remove_unacceptables(self):
+ eq = self.assertEqual
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+To: _xtest@dom.ain
+NNTP-Posting-Host: news.dom.ain
+NNTP-Posting-Date: today
+X-Trace: blah blah
+X-Complaints-To: abuse@dom.ain
+Xref: blah blah
+Xref: blah blah
+Date-Received: yesterday
+Posted: tomorrow
+Posting-Version: 99.99
+Relay-Version: 88.88
+Received: blah blah
+
+A message
+""")
+ msgdata = {}
+ prepare_message(self._mlist, msg, msgdata)
+ eq(msgdata.get('prepped'), 1)
+ eq(msg['from'], 'aperson@dom.ain')
+ eq(msg['to'], '_xtest@dom.ain')
+ eq(msg['nntp-posting-host'], None)
+ eq(msg['nntp-posting-date'], None)
+ eq(msg['x-trace'], None)
+ eq(msg['x-complaints-to'], None)
+ eq(msg['xref'], None)
+ eq(msg['date-received'], None)
+ eq(msg['posted'], None)
+ eq(msg['posting-version'], None)
+ eq(msg['relay-version'], None)
+ eq(msg['received'], None)
+
+ def test_munge_duplicates_no_duplicates(self):
+ eq = self.assertEqual
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+To: _xtest@dom.ain
+Cc: someother@dom.ain
+Content-Transfer-Encoding: yes
+
+A message
+""")
+ msgdata = {}
+ prepare_message(self._mlist, msg, msgdata)
+ eq(msgdata.get('prepped'), 1)
+ eq(msg['from'], 'aperson@dom.ain')
+ eq(msg['to'], '_xtest@dom.ain')
+ eq(msg['cc'], 'someother@dom.ain')
+ eq(msg['content-transfer-encoding'], 'yes')
+
+ def test_munge_duplicates(self):
+ eq = self.assertEqual
+ msg = email.message_from_string("""\
+From: aperson@dom.ain
+To: _xtest@dom.ain
+To: two@dom.ain
+Cc: three@dom.ain
+Cc: four@dom.ain
+Cc: five@dom.ain
+Content-Transfer-Encoding: yes
+Content-Transfer-Encoding: no
+Content-Transfer-Encoding: maybe
+
+A message
+""")
+ msgdata = {}
+ prepare_message(self._mlist, msg, msgdata)
+ eq(msgdata.get('prepped'), 1)
+ eq(msg.get_all('from'), ['aperson@dom.ain'])
+ eq(msg.get_all('to'), ['_xtest@dom.ain'])
+ eq(msg.get_all('cc'), ['three@dom.ain'])
+ eq(msg.get_all('content-transfer-encoding'), ['yes'])
+ eq(msg.get_all('x-original-to'), ['two@dom.ain'])
+ eq(msg.get_all('x-original-cc'), ['four@dom.ain', 'five@dom.ain'])
+ eq(msg.get_all('x-original-content-transfer-encoding'),
+ ['no', 'maybe'])
+
+
+
+class TestSwitchboard(TestBase):
+ def setUp(self):
+ TestBase.setUp(self)
+ self._tmpdir = tempfile.mkdtemp()
+ self._msg = email.message_from_string("""\
+From: aperson@dom.ain
+To: _xtest@dom.ain
+
+A test message.
+""")
+ self._sb = Switchboard(self._tmpdir)
+
+ def tearDown(self):
+ shutil.rmtree(self._tmpdir, True)
+ TestBase.tearDown(self)
+
+ def _count_qfiles(self):
+ files = {}
+ for qfile in os.listdir(self._tmpdir):
+ root, ext = os.path.splitext(qfile)
+ files[ext] = files.get(ext, 0) + 1
+ return files
+
+ def test_whichq(self):
+ self.assertEqual(self._sb.whichq(), self._tmpdir)
+
+ def test_enqueue(self):
+ eq = self.assertEqual
+ self._sb.enqueue(self._msg, listname='_xtest@dom.ain')
+ files = self._count_qfiles()
+ eq(len(files), 1)
+ eq(files['.pck'], 1)
+
+ def test_dequeue(self):
+ eq = self.assertEqual
+ self._sb.enqueue(self._msg, listname='_xtest@dom.ain', foo='yes')
+ count = 0
+ for filebase in self._sb.files():
+ msg, data = self._sb.dequeue(filebase)
+ self._sb.finish(filebase)
+ count += 1
+ eq(msg['from'], self._msg['from'])
+ eq(msg['to'], self._msg['to'])
+ eq(data['foo'], 'yes')
+ eq(count, 1)
+ files = self._count_qfiles()
+ eq(len(files), 0)
+
+ def test_bakfile(self):
+ eq = self.assertEqual
+ self._sb.enqueue(self._msg, listname='_xtest@dom.ain')
+ self._sb.enqueue(self._msg, listname='_xtest@dom.ain')
+ self._sb.enqueue(self._msg, listname='_xtest@dom.ain')
+ for filebase in self._sb.files():
+ self._sb.dequeue(filebase)
+ files = self._count_qfiles()
+ eq(len(files), 1)
+ eq(files['.bak'], 3)
+
+ def test_recover(self):
+ eq = self.assertEqual
+ self._sb.enqueue(self._msg, listname='_xtest@dom.ain')
+ for filebase in self._sb.files():
+ self._sb.dequeue(filebase)
+ # Not calling sb.finish() leaves .bak files
+ sb2 = Switchboard(self._tmpdir, recover=True)
+ files = self._count_qfiles()
+ eq(len(files), 1)
+ eq(files['.pck'], 1)
+
+
+
+class TestableRunner(Runner):
+ def _dispose(self, mlist, msg, msgdata):
+ self.msg = msg
+ self.data = msgdata
+ return False
+
+ def _doperiodic(self):
+ self.stop()
+
+ def _snooze(self, filecnt):
+ return
+
+
+class TestRunner(TestBase):
+ def setUp(self):
+ TestBase.setUp(self)
+ self._tmpdir = tempfile.mkdtemp()
+ self._msg = email.message_from_string("""\
+From: aperson@dom.ain
+To: _xtest@dom.ain
+
+A test message.
+""", Message)
+ class MyRunner(TestableRunner):
+ QDIR = self._tmpdir
+ self._runner = MyRunner()
+
+ def tearDown(self):
+ shutil.rmtree(self._tmpdir, True)
+ TestBase.tearDown(self)
+
+ def test_run_loop(self):
+ eq = self.assertEqual
+ sb = Switchboard(self._tmpdir)
+ sb.enqueue(self._msg, listname='_xtest@example.com', foo='yes')
+ self._runner.run()
+ eq(self._runner.msg['from'], self._msg['from'])
+ eq(self._runner.msg['to'], self._msg['to'])
+ eq(self._runner.data['foo'], 'yes')
+
+
+
+def test_suite():
+ suite = unittest.TestSuite()
+ suite.addTest(unittest.makeSuite(TestPrepMessage))
+ suite.addTest(unittest.makeSuite(TestSwitchboard))
+ suite.addTest(unittest.makeSuite(TestRunner))
+ return suite