summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildout.cfg2
-rw-r--r--mailman/queue/__init__.py53
-rw-r--r--mailman/queue/docs/switchboard.txt37
3 files changed, 77 insertions, 15 deletions
diff --git a/buildout.cfg b/buildout.cfg
index f502d24e9..d345bf142 100644
--- a/buildout.cfg
+++ b/buildout.cfg
@@ -5,7 +5,7 @@ parts =
test
unzip = true
# bzr branch lp:~barry/lazr.config/megamerge
-develop = . /Users/barry/projects/lazr/megamerge
+develop = . /home/barry/projects/lazr/megamerge
[interpreter]
recipe = zc.recipe.egg
diff --git a/mailman/queue/__init__.py b/mailman/queue/__init__.py
index f83fd46e9..65e31f6f3 100644
--- a/mailman/queue/__init__.py
+++ b/mailman/queue/__init__.py
@@ -62,6 +62,10 @@ shamax = 0xffffffffffffffffffffffffffffffffffffffffL
# prevents skipping one of two entries with the same time until the next pass.
DELTA = .0001
DOT = '.'
+# We count the number of times a file has been moved to .bak and recovered.
+# In order to prevent loops and a message flood, when the count reaches this
+# value, we move the file to the bad queue as a .psv.
+MAX_BAK_COUNT = 3
elog = logging.getLogger('mailman.error')
dlog = logging.getLogger('mailman.debug')
@@ -169,10 +173,7 @@ class Switchboard:
with open(filename) as fp:
# Move the file to the backup file name for processing. If this
# process crashes uncleanly the .bak file will be used to
- # re-instate the .pck file in order to try again. XXX what if
- # something caused Python to constantly crash? Is it possible
- # that we'd end up mail bombing recipients or crushing the
- # archiver? How would we defend against that?
+ # re-instate the .pck file in order to try again.
os.rename(filename, backfile)
msg = cPickle.load(fp)
data = cPickle.load(fp)
@@ -187,13 +188,12 @@ class Switchboard:
return msg, data
def finish(self, filebase, preserve=False):
+ """See `ISwitchboard`."""
bakfile = os.path.join(self.queue_directory, filebase + '.bak')
try:
if preserve:
- shunt_dir = config.switchboards['shunt'].queue_directory
- psvfile = os.path.join(shunt_dir, filebase + '.psv')
- # Create the directory if it doesn't yet exist.
- Utils.makedirs(shunt_dir, 0770)
+ bad_dir = config.switchboards['bad'].queue_directory
+ psvfile = os.path.join(bad_dir, filebase + '.psv')
os.rename(bakfile, psvfile)
else:
os.unlink(bakfile)
@@ -233,11 +233,44 @@ class Switchboard:
"""See `ISwitchboard`."""
# Move all .bak files in our slice to .pck. It's impossible for both
# to exist at the same time, so the move is enough to ensure that our
- # normal dequeuing process will handle them.
+ # normal dequeuing process will handle them. We keep count in
+ # _bak_count in the metadata of the number of times we recover this
+ # file. When the count reaches MAX_BAK_COUNT, we move the .bak file
+ # to a .psv file in the bad queue.
for filebase in self.get_files('.bak'):
src = os.path.join(self.queue_directory, filebase + '.bak')
dst = os.path.join(self.queue_directory, filebase + '.pck')
- os.rename(src, dst)
+ fp = open(src, 'rb+')
+ try:
+ try:
+ msg = cPickle.load(fp)
+ data_pos = fp.tell()
+ data = cPickle.load(fp)
+ except Exception, s:
+ # If unpickling throws any exception, just log and
+ # preserve this entry
+ elog.error('Unpickling .bak exception: %s\n'
+ 'Preserving file: %s', s, filebase)
+ self.finish(filebase, preserve=True)
+ else:
+ data['_bak_count'] = data.get('_bak_count', 0) + 1
+ fp.seek(data_pos)
+ if data.get('_parsemsg'):
+ protocol = 0
+ else:
+ protocol = 1
+ cPickle.dump(data, fp, protocol)
+ fp.truncate()
+ fp.flush()
+ os.fsync(fp.fileno())
+ if data['_bak_count'] >= MAX_BAK_COUNT:
+ elog.error('.bak file max count, preserving file: %s',
+ filebase)
+ self.finish(filebase, preserve=True)
+ else:
+ os.rename(src, dst)
+ finally:
+ fp.close()
diff --git a/mailman/queue/docs/switchboard.txt b/mailman/queue/docs/switchboard.txt
index 7baee7b54..741d435e1 100644
--- a/mailman/queue/docs/switchboard.txt
+++ b/mailman/queue/docs/switchboard.txt
@@ -22,9 +22,11 @@ Create a switchboard by giving its queue directory.
Here's a helper function for ensuring things work correctly.
- >>> def check_qfiles():
+ >>> def check_qfiles(directory=None):
+ ... if directory is None:
+ ... directory = queue_directory
... files = {}
- ... for qfile in os.listdir(queue_directory):
+ ... for qfile in os.listdir(directory):
... root, ext = os.path.splitext(qfile)
... files[ext] = files.get(ext, 0) + 1
... return sorted(files.items())
@@ -133,12 +135,39 @@ place. These can be recovered when the switchboard is instantiated.
>>> check_qfiles()
[('.pck', 3)]
-Clean up
+The files can be recovered explicitly.
>>> for filebase in switchboard.files:
... msg, msgdata = switchboard.dequeue(filebase)
- ... switchboard.finish(filebase)
+ ... # Don't call .finish()
+ >>> check_qfiles()
+ [('.bak', 3)]
+ >>> switchboard.recover_backup_files()
>>> check_qfiles()
+ [('.pck', 3)]
+
+But the files will only be recovered at most three times before they are
+considered defective. In order to prevent mail bombs and loops, once this
+maximum is reached, the files will be preserved in the 'bad' queue.
+
+ >>> for filebase in switchboard.files:
+ ... msg, msgdata = switchboard.dequeue(filebase)
+ ... # Don't call .finish()
+ >>> check_qfiles()
+ [('.bak', 3)]
+ >>> switchboard.recover_backup_files()
+ >>> check_qfiles()
+ []
+
+ >>> bad = config.switchboards['bad']
+ >>> check_qfiles(bad.queue_directory)
+ [('.psv', 3)]
+
+Clean up
+
+ >>> for file in os.listdir(bad.queue_directory):
+ ... os.remove(os.path.join(bad.queue_directory, file))
+ >>> check_qfiles(bad.queue_directory)
[]