diff options
| -rw-r--r-- | buildout.cfg | 2 | ||||
| -rw-r--r-- | mailman/queue/__init__.py | 53 | ||||
| -rw-r--r-- | mailman/queue/docs/switchboard.txt | 37 |
3 files changed, 77 insertions, 15 deletions
diff --git a/buildout.cfg b/buildout.cfg index f502d24e9..d345bf142 100644 --- a/buildout.cfg +++ b/buildout.cfg @@ -5,7 +5,7 @@ parts = test unzip = true # bzr branch lp:~barry/lazr.config/megamerge -develop = . /Users/barry/projects/lazr/megamerge +develop = . /home/barry/projects/lazr/megamerge [interpreter] recipe = zc.recipe.egg diff --git a/mailman/queue/__init__.py b/mailman/queue/__init__.py index f83fd46e9..65e31f6f3 100644 --- a/mailman/queue/__init__.py +++ b/mailman/queue/__init__.py @@ -62,6 +62,10 @@ shamax = 0xffffffffffffffffffffffffffffffffffffffffL # prevents skipping one of two entries with the same time until the next pass. DELTA = .0001 DOT = '.' +# We count the number of times a file has been moved to .bak and recovered. +# In order to prevent loops and a message flood, when the count reaches this +# value, we move the file to the bad queue as a .psv. +MAX_BAK_COUNT = 3 elog = logging.getLogger('mailman.error') dlog = logging.getLogger('mailman.debug') @@ -169,10 +173,7 @@ class Switchboard: with open(filename) as fp: # Move the file to the backup file name for processing. If this # process crashes uncleanly the .bak file will be used to - # re-instate the .pck file in order to try again. XXX what if - # something caused Python to constantly crash? Is it possible - # that we'd end up mail bombing recipients or crushing the - # archiver? How would we defend against that? + # re-instate the .pck file in order to try again. os.rename(filename, backfile) msg = cPickle.load(fp) data = cPickle.load(fp) @@ -187,13 +188,12 @@ class Switchboard: return msg, data def finish(self, filebase, preserve=False): + """See `ISwitchboard`.""" bakfile = os.path.join(self.queue_directory, filebase + '.bak') try: if preserve: - shunt_dir = config.switchboards['shunt'].queue_directory - psvfile = os.path.join(shunt_dir, filebase + '.psv') - # Create the directory if it doesn't yet exist. - Utils.makedirs(shunt_dir, 0770) + bad_dir = config.switchboards['bad'].queue_directory + psvfile = os.path.join(bad_dir, filebase + '.psv') os.rename(bakfile, psvfile) else: os.unlink(bakfile) @@ -233,11 +233,44 @@ class Switchboard: """See `ISwitchboard`.""" # Move all .bak files in our slice to .pck. It's impossible for both # to exist at the same time, so the move is enough to ensure that our - # normal dequeuing process will handle them. + # normal dequeuing process will handle them. We keep count in + # _bak_count in the metadata of the number of times we recover this + # file. When the count reaches MAX_BAK_COUNT, we move the .bak file + # to a .psv file in the bad queue. for filebase in self.get_files('.bak'): src = os.path.join(self.queue_directory, filebase + '.bak') dst = os.path.join(self.queue_directory, filebase + '.pck') - os.rename(src, dst) + fp = open(src, 'rb+') + try: + try: + msg = cPickle.load(fp) + data_pos = fp.tell() + data = cPickle.load(fp) + except Exception, s: + # If unpickling throws any exception, just log and + # preserve this entry + elog.error('Unpickling .bak exception: %s\n' + 'Preserving file: %s', s, filebase) + self.finish(filebase, preserve=True) + else: + data['_bak_count'] = data.get('_bak_count', 0) + 1 + fp.seek(data_pos) + if data.get('_parsemsg'): + protocol = 0 + else: + protocol = 1 + cPickle.dump(data, fp, protocol) + fp.truncate() + fp.flush() + os.fsync(fp.fileno()) + if data['_bak_count'] >= MAX_BAK_COUNT: + elog.error('.bak file max count, preserving file: %s', + filebase) + self.finish(filebase, preserve=True) + else: + os.rename(src, dst) + finally: + fp.close() diff --git a/mailman/queue/docs/switchboard.txt b/mailman/queue/docs/switchboard.txt index 7baee7b54..741d435e1 100644 --- a/mailman/queue/docs/switchboard.txt +++ b/mailman/queue/docs/switchboard.txt @@ -22,9 +22,11 @@ Create a switchboard by giving its queue directory. Here's a helper function for ensuring things work correctly. - >>> def check_qfiles(): + >>> def check_qfiles(directory=None): + ... if directory is None: + ... directory = queue_directory ... files = {} - ... for qfile in os.listdir(queue_directory): + ... for qfile in os.listdir(directory): ... root, ext = os.path.splitext(qfile) ... files[ext] = files.get(ext, 0) + 1 ... return sorted(files.items()) @@ -133,12 +135,39 @@ place. These can be recovered when the switchboard is instantiated. >>> check_qfiles() [('.pck', 3)] -Clean up +The files can be recovered explicitly. >>> for filebase in switchboard.files: ... msg, msgdata = switchboard.dequeue(filebase) - ... switchboard.finish(filebase) + ... # Don't call .finish() + >>> check_qfiles() + [('.bak', 3)] + >>> switchboard.recover_backup_files() >>> check_qfiles() + [('.pck', 3)] + +But the files will only be recovered at most three times before they are +considered defective. In order to prevent mail bombs and loops, once this +maximum is reached, the files will be preserved in the 'bad' queue. + + >>> for filebase in switchboard.files: + ... msg, msgdata = switchboard.dequeue(filebase) + ... # Don't call .finish() + >>> check_qfiles() + [('.bak', 3)] + >>> switchboard.recover_backup_files() + >>> check_qfiles() + [] + + >>> bad = config.switchboards['bad'] + >>> check_qfiles(bad.queue_directory) + [('.psv', 3)] + +Clean up + + >>> for file in os.listdir(bad.queue_directory): + ... os.remove(os.path.join(bad.queue_directory, file)) + >>> check_qfiles(bad.queue_directory) [] |
