diff options
Diffstat (limited to 'src/mailman/core')
| -rw-r--r-- | src/mailman/core/docs/__init__.py | 0 | ||||
| -rw-r--r-- | src/mailman/core/docs/runner.txt | 73 | ||||
| -rw-r--r-- | src/mailman/core/docs/switchboard.txt | 187 | ||||
| -rw-r--r-- | src/mailman/core/runner.py | 249 | ||||
| -rw-r--r-- | src/mailman/core/switchboard.py | 269 |
5 files changed, 778 insertions, 0 deletions
diff --git a/src/mailman/core/docs/__init__.py b/src/mailman/core/docs/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/mailman/core/docs/__init__.py diff --git a/src/mailman/core/docs/runner.txt b/src/mailman/core/docs/runner.txt new file mode 100644 index 000000000..4262dc87a --- /dev/null +++ b/src/mailman/core/docs/runner.txt @@ -0,0 +1,73 @@ +============= +Queue runners +============= + +The queue runners (*qrunner*) are the processes that move messages around the +Mailman system. Each qrunner is responsible for a slice of the hash space in +a queue directory. It processes all the files in its slice, sleeps a little +while, then wakes up and runs through its queue files again. + + +Basic architecture +================== + +The basic architecture of qrunner is implemented in the base class that all +runners inherit from. This base class implements a ``.run()`` method that +runs continuously in a loop until the ``.stop()`` method is called. + + >>> mlist = create_list('_xtest@example.com') + +Here is a very simple derived qrunner class. Queue runners use a +configuration section in the configuration files to determine run +characteristics, such as the queue directory to use. Here we push a +configuration section for the test runner. +:: + + >>> config.push('test-runner', """ + ... [qrunner.test] + ... max_restarts: 1 + ... """) + + >>> from mailman.core.runner import Runner + >>> class TestableRunner(Runner): + ... def _dispose(self, mlist, msg, msgdata): + ... self.msg = msg + ... self.msgdata = msgdata + ... return False + ... + ... def _do_periodic(self): + ... self.stop() + ... + ... def _snooze(self, filecnt): + ... return + + >>> runner = TestableRunner('test') + +This qrunner doesn't do much except run once, storing the message and metadata +on instance variables. + + >>> msg = message_from_string("""\ + ... From: aperson@example.com + ... To: _xtest@example.com + ... + ... A test message. + ... """) + >>> switchboard = config.switchboards['test'] + >>> filebase = switchboard.enqueue(msg, listname=mlist.fqdn_listname, + ... foo='yes', bar='no') + >>> runner.run() + >>> print runner.msg.as_string() + From: aperson@example.com + To: _xtest@example.com + <BLANKLINE> + A test message. + <BLANKLINE> + >>> dump_msgdata(runner.msgdata) + _parsemsg: False + bar : no + foo : yes + lang : en + listname : _xtest@example.com + version : 3 + +XXX More of the Runner API should be tested. diff --git a/src/mailman/core/docs/switchboard.txt b/src/mailman/core/docs/switchboard.txt new file mode 100644 index 000000000..751b1e640 --- /dev/null +++ b/src/mailman/core/docs/switchboard.txt @@ -0,0 +1,187 @@ +The switchboard +=============== + +The switchboard is subsystem that moves messages between queues. Each +instance of a switchboard is responsible for one queue directory. + + >>> msg = message_from_string("""\ + ... From: aperson@example.com + ... To: _xtest@example.com + ... + ... A test message. + ... """) + +Create a switchboard by giving its queue name and directory. + + >>> import os + >>> queue_directory = os.path.join(config.QUEUE_DIR, 'test') + >>> from mailman.core.switchboard import Switchboard + >>> switchboard = Switchboard('test', queue_directory) + >>> print switchboard.name + test + >>> switchboard.queue_directory == queue_directory + True + +Here's a helper function for ensuring things work correctly. + + >>> def check_qfiles(directory=None): + ... if directory is None: + ... directory = queue_directory + ... files = {} + ... for qfile in os.listdir(directory): + ... root, ext = os.path.splitext(qfile) + ... files[ext] = files.get(ext, 0) + 1 + ... if len(files) == 0: + ... print 'empty' + ... for ext in sorted(files): + ... print '{0}: {1}'.format(ext, files[ext]) + + +Enqueing and dequeing +--------------------- + +The message can be enqueued with metadata specified in the passed in +dictionary. + + >>> filebase = switchboard.enqueue(msg) + >>> check_qfiles() + .pck: 1 + +To read the contents of a queue file, dequeue it. + + >>> msg, msgdata = switchboard.dequeue(filebase) + >>> print msg.as_string() + From: aperson@example.com + To: _xtest@example.com + <BLANKLINE> + A test message. + <BLANKLINE> + >>> dump_msgdata(msgdata) + _parsemsg: False + version : 3 + >>> check_qfiles() + .bak: 1 + +To complete the dequeing process, removing all traces of the message file, +finish it (without preservation). + + >>> switchboard.finish(filebase) + >>> check_qfiles() + empty + +When enqueing a file, you can provide additional metadata keys by using +keyword arguments. + + >>> filebase = switchboard.enqueue(msg, {'foo': 1}, bar=2) + >>> msg, msgdata = switchboard.dequeue(filebase) + >>> switchboard.finish(filebase) + >>> dump_msgdata(msgdata) + _parsemsg: False + bar : 2 + foo : 1 + version : 3 + +Keyword arguments override keys from the metadata dictionary. + + >>> filebase = switchboard.enqueue(msg, {'foo': 1}, foo=2) + >>> msg, msgdata = switchboard.dequeue(filebase) + >>> switchboard.finish(filebase) + >>> dump_msgdata(msgdata) + _parsemsg: False + foo : 2 + version : 3 + + +Iterating over files +-------------------- + +There are two ways to iterate over all the files in a switchboard's queue. +Normally, queue files end in .pck (for 'pickle') and the easiest way to +iterate over just these files is to use the .files attribute. + + >>> filebase_1 = switchboard.enqueue(msg, foo=1) + >>> filebase_2 = switchboard.enqueue(msg, foo=2) + >>> filebase_3 = switchboard.enqueue(msg, foo=3) + >>> filebases = sorted((filebase_1, filebase_2, filebase_3)) + >>> sorted(switchboard.files) == filebases + True + >>> check_qfiles() + .pck: 3 + +You can also use the .get_files() method if you want to iterate over all the +file bases for some other extension. + + >>> for filebase in switchboard.get_files(): + ... msg, msgdata = switchboard.dequeue(filebase) + >>> bakfiles = sorted(switchboard.get_files('.bak')) + >>> bakfiles == filebases + True + >>> check_qfiles() + .bak: 3 + >>> for filebase in switchboard.get_files('.bak'): + ... switchboard.finish(filebase) + >>> check_qfiles() + empty + + +Recovering files +---------------- + +Calling .dequeue() without calling .finish() leaves .bak backup files in +place. These can be recovered when the switchboard is instantiated. + + >>> filebase_1 = switchboard.enqueue(msg, foo=1) + >>> filebase_2 = switchboard.enqueue(msg, foo=2) + >>> filebase_3 = switchboard.enqueue(msg, foo=3) + >>> for filebase in switchboard.files: + ... msg, msgdata = switchboard.dequeue(filebase) + ... # Don't call .finish() + >>> check_qfiles() + .bak: 3 + >>> switchboard_2 = Switchboard('test', queue_directory, recover=True) + >>> check_qfiles() + .pck: 3 + +The files can be recovered explicitly. + + >>> for filebase in switchboard.files: + ... msg, msgdata = switchboard.dequeue(filebase) + ... # Don't call .finish() + >>> check_qfiles() + .bak: 3 + >>> switchboard.recover_backup_files() + >>> check_qfiles() + .pck: 3 + +But the files will only be recovered at most three times before they are +considered defective. In order to prevent mail bombs and loops, once this +maximum is reached, the files will be preserved in the 'bad' queue. +:: + + >>> for filebase in switchboard.files: + ... msg, msgdata = switchboard.dequeue(filebase) + ... # Don't call .finish() + >>> check_qfiles() + .bak: 3 + >>> switchboard.recover_backup_files() + >>> check_qfiles() + empty + + >>> bad = config.switchboards['bad'] + >>> check_qfiles(bad.queue_directory) + .psv: 3 + + +Clean up +-------- + + >>> for file in os.listdir(bad.queue_directory): + ... os.remove(os.path.join(bad.queue_directory, file)) + >>> check_qfiles(bad.queue_directory) + empty + + +Queue slices +------------ + +XXX Add tests for queue slices. diff --git a/src/mailman/core/runner.py b/src/mailman/core/runner.py new file mode 100644 index 000000000..3d876ac3d --- /dev/null +++ b/src/mailman/core/runner.py @@ -0,0 +1,249 @@ +# Copyright (C) 2001-2011 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. + +"""The process runner base class.""" + +from __future__ import absolute_import, unicode_literals + +__metaclass__ = type +__all__ = [ + 'Runner', + ] + + +import time +import logging +import traceback + +from cStringIO import StringIO +from lazr.config import as_boolean, as_timedelta +from zope.component import getUtility +from zope.interface import implements + +from mailman.config import config +from mailman.core.i18n import _ +from mailman.core.switchboard import Switchboard +from mailman.interfaces.languages import ILanguageManager +from mailman.interfaces.listmanager import IListManager +from mailman.interfaces.runner import IRunner +from mailman.utilities.string import expand + + +dlog = logging.getLogger('mailman.debug') +elog = logging.getLogger('mailman.error') + + + +class Runner: + implements(IRunner) + + intercept_signals = True + + def __init__(self, name, slice=None): + """Create a queue runner. + + :param slice: The slice number for this queue runner. This is passed + directly to the underlying `ISwitchboard` object. + :type slice: int or None + """ + # Grab the configuration section. + self.name = name + section = getattr(config, 'qrunner.' + name) + substitutions = config.paths + substitutions['name'] = name + self.queue_directory = expand(section.path, substitutions) + numslices = int(section.instances) + self.switchboard = Switchboard( + name, self.queue_directory, slice, numslices, True) + self.sleep_time = as_timedelta(section.sleep_time) + # sleep_time is a timedelta; turn it into a float for time.sleep(). + self.sleep_float = (86400 * self.sleep_time.days + + self.sleep_time.seconds + + self.sleep_time.microseconds / 1.0e6) + self.max_restarts = int(section.max_restarts) + self.start = as_boolean(section.start) + self._stop = False + + def __repr__(self): + return '<{0} at {1:#x}>'.format(self.__class__.__name__, id(self)) + + def stop(self): + """See `IRunner`.""" + self._stop = True + + def run(self): + """See `IRunner`.""" + # Start the main loop for this queue runner. + try: + while True: + # Once through the loop that processes all the files in the + # queue directory. + filecnt = self._one_iteration() + # Do the periodic work for the subclass. + self._do_periodic() + # If the stop flag is set, we're done. + if self._stop: + break + # Give the runner an opportunity to snooze for a while, but + # pass it the file count so it can decide whether to do more + # work now or not. + self._snooze(filecnt) + except KeyboardInterrupt: + pass + finally: + self._clean_up() + + def _one_iteration(self): + """See `IRunner`.""" + me = self.__class__.__name__ + dlog.debug('[%s] starting oneloop', me) + # List all the files in our queue directory. The switchboard is + # guaranteed to hand us the files in FIFO order. + files = self.switchboard.files + for filebase in files: + dlog.debug('[%s] processing filebase: %s', me, filebase) + try: + # Ask the switchboard for the message and metadata objects + # associated with this queue file. + msg, msgdata = self.switchboard.dequeue(filebase) + except Exception as error: + # This used to just catch email.Errors.MessageParseError, but + # other problems can occur in message parsing, e.g. + # ValueError, and exceptions can occur in unpickling too. We + # don't want the runner to die, so we just log and skip this + # entry, but preserve it for analysis. + self._log(error) + elog.error('Skipping and preserving unparseable message: %s', + filebase) + self.switchboard.finish(filebase, preserve=True) + config.db.abort() + continue + try: + dlog.debug('[%s] processing onefile', me) + self._process_one_file(msg, msgdata) + dlog.debug('[%s] finishing filebase: %s', me, filebase) + self.switchboard.finish(filebase) + except Exception as error: + # All runners that implement _dispose() must guarantee that + # exceptions are caught and dealt with properly. Still, there + # may be a bug in the infrastructure, and we do not want those + # to cause messages to be lost. Any uncaught exceptions will + # cause the message to be stored in the shunt queue for human + # intervention. + self._log(error) + # Put a marker in the metadata for unshunting. + msgdata['whichq'] = self.switchboard.name + # It is possible that shunting can throw an exception, e.g. a + # permissions problem or a MemoryError due to a really large + # message. Try to be graceful. + try: + shunt = config.switchboards['shunt'] + new_filebase = shunt.enqueue(msg, msgdata) + elog.error('SHUNTING: %s', new_filebase) + self.switchboard.finish(filebase) + except Exception as error: + # The message wasn't successfully shunted. Log the + # exception and try to preserve the original queue entry + # for possible analysis. + self._log(error) + elog.error( + 'SHUNTING FAILED, preserving original entry: %s', + filebase) + self.switchboard.finish(filebase, preserve=True) + config.db.abort() + # Other work we want to do each time through the loop. + dlog.debug('[%s] doing periodic', me) + self._do_periodic() + dlog.debug('[%s] checking short circuit', me) + if self._short_circuit(): + dlog.debug('[%s] short circuiting', me) + break + dlog.debug('[%s] commiting', me) + config.db.commit() + dlog.debug('[%s] ending oneloop: %s', me, len(files)) + return len(files) + + def _process_one_file(self, msg, msgdata): + """See `IRunner`.""" + # Do some common sanity checking on the message metadata. It's got to + # be destined for a particular mailing list. This switchboard is used + # to shunt off badly formatted messages. We don't want to just trash + # them because they may be fixable with human intervention. Just get + # them out of our sight. + # + # Find out which mailing list this message is destined for. + missing = object() + listname = msgdata.get('listname', missing) + mlist = (None + if listname is missing + else getUtility(IListManager).get(unicode(listname))) + if mlist is None: + elog.error( + '%s runner "%s" shunting message for missing list: %s', + msg['message-id'], self.name, + ('n/a' if listname is missing else listname)) + config.switchboards['shunt'].enqueue(msg, msgdata) + return + # Now process this message. We also want to set up the language + # context for this message. The context will be the preferred + # language for the user if the sender is a member of the list, or it + # will be the list's preferred language. However, we must take + # special care to reset the defaults, otherwise subsequent messages + # may be translated incorrectly. + if mlist is None: + language_manager = getUtility(ILanguageManager) + language = language_manager[config.mailman.default_language] + elif msg.sender: + member = mlist.members.get_member(msg.sender) + language = (member.preferred_language + if member is not None + else mlist.preferred_language) + else: + language = mlist.preferred_language + with _.using(language.code): + msgdata['lang'] = language.code + keepqueued = self._dispose(mlist, msg, msgdata) + if keepqueued: + self.switchboard.enqueue(msg, msgdata) + + def _log(self, exc): + elog.error('Uncaught runner exception: %s', exc) + s = StringIO() + traceback.print_exc(file=s) + elog.error('%s', s.getvalue()) + + def _clean_up(self): + """See `IRunner`.""" + pass + + def _dispose(self, mlist, msg, msgdata): + """See `IRunner`.""" + raise NotImplementedError + + def _do_periodic(self): + """See `IRunner`.""" + pass + + def _snooze(self, filecnt): + """See `IRunner`.""" + if filecnt or self.sleep_float <= 0: + return + time.sleep(self.sleep_float) + + def _short_circuit(self): + """See `IRunner`.""" + return self._stop diff --git a/src/mailman/core/switchboard.py b/src/mailman/core/switchboard.py new file mode 100644 index 000000000..5d9eb65ce --- /dev/null +++ b/src/mailman/core/switchboard.py @@ -0,0 +1,269 @@ +# Copyright (C) 2001-2011 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. + +"""Queing and dequeuing message/metadata pickle files. + +Messages are represented as email.message.Message objects (or an instance ofa +subclass). Metadata is represented as a Python dictionary. For every +message/metadata pair in a queue, a single file containing two pickles is +written. First, the message is written to the pickle, then the metadata +dictionary is written. +""" + +from __future__ import absolute_import, unicode_literals + +__metaclass__ = type +__all__ = [ + 'Switchboard', + ] + + +import os +import time +import email +import pickle +import cPickle +import hashlib +import logging + +from zope.interface import implements + +from mailman.config import config +from mailman.email.message import Message +from mailman.interfaces.switchboard import ISwitchboard +from mailman.utilities.filesystem import makedirs +from mailman.utilities.string import expand + + +# 20 bytes of all bits set, maximum hashlib.sha.digest() value +shamax = 0xffffffffffffffffffffffffffffffffffffffffL +# Small increment to add to time in case two entries have the same time. This +# prevents skipping one of two entries with the same time until the next pass. +DELTA = .0001 +# We count the number of times a file has been moved to .bak and recovered. +# In order to prevent loops and a message flood, when the count reaches this +# value, we move the file to the bad queue as a .psv. +MAX_BAK_COUNT = 3 + +elog = logging.getLogger('mailman.error') + + + +class Switchboard: + implements(ISwitchboard) + + @staticmethod + def initialize(): + """Initialize the global switchboards for input/output.""" + for conf in config.qrunner_configs: + name = conf.name.split('.')[-1] + assert name not in config.switchboards, ( + 'Duplicate qrunner name: {0}'.format(name)) + substitutions = config.paths + substitutions['name'] = name + path = expand(conf.path, substitutions) + config.switchboards[name] = Switchboard(name, path) + + def __init__(self, name, queue_directory, + slice=None, numslices=1, recover=False): + """Create a switchboard object. + + :param name: The queue name. + :type name: str + :param queue_directory: The queue directory. + :type queue_directory: str + :param slice: The slice number for this switchboard, or None. If not + None, it must be [0..`numslices`). + :type slice: int or None + :param numslices: The total number of slices to split this queue + directory into. It must be a power of 2. + :type numslices: int + :param recover: True if backup files should be recovered. + :type recover: bool + """ + assert (numslices & (numslices - 1)) == 0, ( + 'Not a power of 2: {0}'.format(numslices)) + self.name = name + self.queue_directory = queue_directory + # If configured to, create the directory if it doesn't yet exist. + if config.create_paths: + makedirs(self.queue_directory, 0770) + # Fast track for no slices + self._lower = None + self._upper = None + # BAW: test performance and end-cases of this algorithm + if numslices <> 1: + self._lower = ((shamax + 1) * slice) / numslices + self._upper = (((shamax + 1) * (slice + 1)) / numslices) - 1 + if recover: + self.recover_backup_files() + + def enqueue(self, _msg, _metadata=None, **_kws): + """See `ISwitchboard`.""" + if _metadata is None: + _metadata = {} + # Calculate the SHA hexdigest of the message to get a unique base + # filename. We're also going to use the digest as a hash into the set + # of parallel qrunner processes. + data = _metadata.copy() + data.update(_kws) + listname = data.get('listname', '--nolist--') + # Get some data for the input to the sha hash. + now = time.time() + if data.get('_plaintext'): + protocol = 0 + msgsave = cPickle.dumps(str(_msg), protocol) + else: + protocol = pickle.HIGHEST_PROTOCOL + msgsave = cPickle.dumps(_msg, protocol) + # listname is unicode but the input to the hash function must be an + # 8-bit string (eventually, a bytes object). + hashfood = msgsave + listname.encode('utf-8') + repr(now) + # Encode the current time into the file name for FIFO sorting. The + # file name consists of two parts separated by a '+': the received + # time for this message (i.e. when it first showed up on this system) + # and the sha hex digest. + rcvtime = data.setdefault('received_time', now) + filebase = repr(rcvtime) + '+' + hashlib.sha1(hashfood).hexdigest() + filename = os.path.join(self.queue_directory, filebase + '.pck') + tmpfile = filename + '.tmp' + # Always add the metadata schema version number + data['version'] = config.QFILE_SCHEMA_VERSION + # Filter out volatile entries. Use .keys() so that we can mutate the + # dictionary during the iteration. + for k in data.keys(): + if k.startswith('_'): + del data[k] + # We have to tell the dequeue() method whether to parse the message + # object or not. + data['_parsemsg'] = (protocol == 0) + # Write to the pickle file the message object and metadata. + with open(tmpfile, 'w') as fp: + fp.write(msgsave) + cPickle.dump(data, fp, protocol) + fp.flush() + os.fsync(fp.fileno()) + os.rename(tmpfile, filename) + return filebase + + def dequeue(self, filebase): + """See `ISwitchboard`.""" + # Calculate the filename from the given filebase. + filename = os.path.join(self.queue_directory, filebase + '.pck') + backfile = os.path.join(self.queue_directory, filebase + '.bak') + # Read the message object and metadata. + with open(filename) as fp: + # Move the file to the backup file name for processing. If this + # process crashes uncleanly the .bak file will be used to + # re-instate the .pck file in order to try again. + os.rename(filename, backfile) + msg = cPickle.load(fp) + data = cPickle.load(fp) + if data.get('_parsemsg'): + # Calculate the original size of the text now so that we won't + # have to generate the message later when we do size restriction + # checking. + original_size = len(msg) + msg = email.message_from_string(msg, Message) + msg.original_size = original_size + data['original_size'] = original_size + return msg, data + + def finish(self, filebase, preserve=False): + """See `ISwitchboard`.""" + bakfile = os.path.join(self.queue_directory, filebase + '.bak') + try: + if preserve: + bad_dir = config.switchboards['bad'].queue_directory + psvfile = os.path.join(bad_dir, filebase + '.psv') + os.rename(bakfile, psvfile) + else: + os.unlink(bakfile) + except EnvironmentError: + elog.exception( + 'Failed to unlink/preserve backup file: %s', bakfile) + + @property + def files(self): + """See `ISwitchboard`.""" + return self.get_files() + + def get_files(self, extension='.pck'): + """See `ISwitchboard`.""" + times = {} + lower = self._lower + upper = self._upper + for f in os.listdir(self.queue_directory): + # By ignoring anything that doesn't end in .pck, we ignore + # tempfiles and avoid a race condition. + filebase, ext = os.path.splitext(f) + if ext <> extension: + continue + when, digest = filebase.split('+', 1) + # Throw out any files which don't match our bitrange. BAW: test + # performance and end-cases of this algorithm. MAS: both + # comparisons need to be <= to get complete range. + if lower is None or (lower <= long(digest, 16) <= upper): + key = float(when) + while key in times: + key += DELTA + times[key] = filebase + # FIFO sort + return [times[key] for key in sorted(times)] + + def recover_backup_files(self): + """See `ISwitchboard`.""" + # Move all .bak files in our slice to .pck. It's impossible for both + # to exist at the same time, so the move is enough to ensure that our + # normal dequeuing process will handle them. We keep count in + # _bak_count in the metadata of the number of times we recover this + # file. When the count reaches MAX_BAK_COUNT, we move the .bak file + # to a .psv file in the bad queue. + for filebase in self.get_files('.bak'): + src = os.path.join(self.queue_directory, filebase + '.bak') + dst = os.path.join(self.queue_directory, filebase + '.pck') + fp = open(src, 'rb+') + try: + try: + msg = cPickle.load(fp) + data_pos = fp.tell() + data = cPickle.load(fp) + except Exception as error: + # If unpickling throws any exception, just log and + # preserve this entry + elog.error('Unpickling .bak exception: %s\n' + 'Preserving file: %s', error, filebase) + self.finish(filebase, preserve=True) + else: + data['_bak_count'] = data.get('_bak_count', 0) + 1 + fp.seek(data_pos) + if data.get('_parsemsg'): + protocol = 0 + else: + protocol = 1 + cPickle.dump(data, fp, protocol) + fp.truncate() + fp.flush() + os.fsync(fp.fileno()) + if data['_bak_count'] >= MAX_BAK_COUNT: + elog.error('.bak file max count, preserving file: %s', + filebase) + self.finish(filebase, preserve=True) + else: + os.rename(src, dst) + finally: + fp.close() |
