diff options
Diffstat (limited to 'mailman/queue/__init__.py')
| -rw-r--r-- | mailman/queue/__init__.py | 53 |
1 files changed, 43 insertions, 10 deletions
diff --git a/mailman/queue/__init__.py b/mailman/queue/__init__.py index f83fd46e9..65e31f6f3 100644 --- a/mailman/queue/__init__.py +++ b/mailman/queue/__init__.py @@ -62,6 +62,10 @@ shamax = 0xffffffffffffffffffffffffffffffffffffffffL # prevents skipping one of two entries with the same time until the next pass. DELTA = .0001 DOT = '.' +# We count the number of times a file has been moved to .bak and recovered. +# In order to prevent loops and a message flood, when the count reaches this +# value, we move the file to the bad queue as a .psv. +MAX_BAK_COUNT = 3 elog = logging.getLogger('mailman.error') dlog = logging.getLogger('mailman.debug') @@ -169,10 +173,7 @@ class Switchboard: with open(filename) as fp: # Move the file to the backup file name for processing. If this # process crashes uncleanly the .bak file will be used to - # re-instate the .pck file in order to try again. XXX what if - # something caused Python to constantly crash? Is it possible - # that we'd end up mail bombing recipients or crushing the - # archiver? How would we defend against that? + # re-instate the .pck file in order to try again. os.rename(filename, backfile) msg = cPickle.load(fp) data = cPickle.load(fp) @@ -187,13 +188,12 @@ class Switchboard: return msg, data def finish(self, filebase, preserve=False): + """See `ISwitchboard`.""" bakfile = os.path.join(self.queue_directory, filebase + '.bak') try: if preserve: - shunt_dir = config.switchboards['shunt'].queue_directory - psvfile = os.path.join(shunt_dir, filebase + '.psv') - # Create the directory if it doesn't yet exist. - Utils.makedirs(shunt_dir, 0770) + bad_dir = config.switchboards['bad'].queue_directory + psvfile = os.path.join(bad_dir, filebase + '.psv') os.rename(bakfile, psvfile) else: os.unlink(bakfile) @@ -233,11 +233,44 @@ class Switchboard: """See `ISwitchboard`.""" # Move all .bak files in our slice to .pck. It's impossible for both # to exist at the same time, so the move is enough to ensure that our - # normal dequeuing process will handle them. + # normal dequeuing process will handle them. We keep count in + # _bak_count in the metadata of the number of times we recover this + # file. When the count reaches MAX_BAK_COUNT, we move the .bak file + # to a .psv file in the bad queue. for filebase in self.get_files('.bak'): src = os.path.join(self.queue_directory, filebase + '.bak') dst = os.path.join(self.queue_directory, filebase + '.pck') - os.rename(src, dst) + fp = open(src, 'rb+') + try: + try: + msg = cPickle.load(fp) + data_pos = fp.tell() + data = cPickle.load(fp) + except Exception, s: + # If unpickling throws any exception, just log and + # preserve this entry + elog.error('Unpickling .bak exception: %s\n' + 'Preserving file: %s', s, filebase) + self.finish(filebase, preserve=True) + else: + data['_bak_count'] = data.get('_bak_count', 0) + 1 + fp.seek(data_pos) + if data.get('_parsemsg'): + protocol = 0 + else: + protocol = 1 + cPickle.dump(data, fp, protocol) + fp.truncate() + fp.flush() + os.fsync(fp.fileno()) + if data['_bak_count'] >= MAX_BAK_COUNT: + elog.error('.bak file max count, preserving file: %s', + filebase) + self.finish(filebase, preserve=True) + else: + os.rename(src, dst) + finally: + fp.close() |
