summaryrefslogtreecommitdiff
path: root/src/mailman/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman/core')
-rw-r--r--src/mailman/core/docs/__init__.py0
-rw-r--r--src/mailman/core/docs/runner.txt73
-rw-r--r--src/mailman/core/docs/switchboard.txt187
-rw-r--r--src/mailman/core/runner.py249
-rw-r--r--src/mailman/core/switchboard.py269
5 files changed, 778 insertions, 0 deletions
diff --git a/src/mailman/core/docs/__init__.py b/src/mailman/core/docs/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/mailman/core/docs/__init__.py
diff --git a/src/mailman/core/docs/runner.txt b/src/mailman/core/docs/runner.txt
new file mode 100644
index 000000000..4262dc87a
--- /dev/null
+++ b/src/mailman/core/docs/runner.txt
@@ -0,0 +1,73 @@
+=============
+Queue runners
+=============
+
+The queue runners (*qrunner*) are the processes that move messages around the
+Mailman system. Each qrunner is responsible for a slice of the hash space in
+a queue directory. It processes all the files in its slice, sleeps a little
+while, then wakes up and runs through its queue files again.
+
+
+Basic architecture
+==================
+
+The basic architecture of qrunner is implemented in the base class that all
+runners inherit from. This base class implements a ``.run()`` method that
+runs continuously in a loop until the ``.stop()`` method is called.
+
+ >>> mlist = create_list('_xtest@example.com')
+
+Here is a very simple derived qrunner class. Queue runners use a
+configuration section in the configuration files to determine run
+characteristics, such as the queue directory to use. Here we push a
+configuration section for the test runner.
+::
+
+ >>> config.push('test-runner', """
+ ... [qrunner.test]
+ ... max_restarts: 1
+ ... """)
+
+ >>> from mailman.core.runner import Runner
+ >>> class TestableRunner(Runner):
+ ... def _dispose(self, mlist, msg, msgdata):
+ ... self.msg = msg
+ ... self.msgdata = msgdata
+ ... return False
+ ...
+ ... def _do_periodic(self):
+ ... self.stop()
+ ...
+ ... def _snooze(self, filecnt):
+ ... return
+
+ >>> runner = TestableRunner('test')
+
+This qrunner doesn't do much except run once, storing the message and metadata
+on instance variables.
+
+ >>> msg = message_from_string("""\
+ ... From: aperson@example.com
+ ... To: _xtest@example.com
+ ...
+ ... A test message.
+ ... """)
+ >>> switchboard = config.switchboards['test']
+ >>> filebase = switchboard.enqueue(msg, listname=mlist.fqdn_listname,
+ ... foo='yes', bar='no')
+ >>> runner.run()
+ >>> print runner.msg.as_string()
+ From: aperson@example.com
+ To: _xtest@example.com
+ <BLANKLINE>
+ A test message.
+ <BLANKLINE>
+ >>> dump_msgdata(runner.msgdata)
+ _parsemsg: False
+ bar : no
+ foo : yes
+ lang : en
+ listname : _xtest@example.com
+ version : 3
+
+XXX More of the Runner API should be tested.
diff --git a/src/mailman/core/docs/switchboard.txt b/src/mailman/core/docs/switchboard.txt
new file mode 100644
index 000000000..751b1e640
--- /dev/null
+++ b/src/mailman/core/docs/switchboard.txt
@@ -0,0 +1,187 @@
+The switchboard
+===============
+
+The switchboard is subsystem that moves messages between queues. Each
+instance of a switchboard is responsible for one queue directory.
+
+ >>> msg = message_from_string("""\
+ ... From: aperson@example.com
+ ... To: _xtest@example.com
+ ...
+ ... A test message.
+ ... """)
+
+Create a switchboard by giving its queue name and directory.
+
+ >>> import os
+ >>> queue_directory = os.path.join(config.QUEUE_DIR, 'test')
+ >>> from mailman.core.switchboard import Switchboard
+ >>> switchboard = Switchboard('test', queue_directory)
+ >>> print switchboard.name
+ test
+ >>> switchboard.queue_directory == queue_directory
+ True
+
+Here's a helper function for ensuring things work correctly.
+
+ >>> def check_qfiles(directory=None):
+ ... if directory is None:
+ ... directory = queue_directory
+ ... files = {}
+ ... for qfile in os.listdir(directory):
+ ... root, ext = os.path.splitext(qfile)
+ ... files[ext] = files.get(ext, 0) + 1
+ ... if len(files) == 0:
+ ... print 'empty'
+ ... for ext in sorted(files):
+ ... print '{0}: {1}'.format(ext, files[ext])
+
+
+Enqueing and dequeing
+---------------------
+
+The message can be enqueued with metadata specified in the passed in
+dictionary.
+
+ >>> filebase = switchboard.enqueue(msg)
+ >>> check_qfiles()
+ .pck: 1
+
+To read the contents of a queue file, dequeue it.
+
+ >>> msg, msgdata = switchboard.dequeue(filebase)
+ >>> print msg.as_string()
+ From: aperson@example.com
+ To: _xtest@example.com
+ <BLANKLINE>
+ A test message.
+ <BLANKLINE>
+ >>> dump_msgdata(msgdata)
+ _parsemsg: False
+ version : 3
+ >>> check_qfiles()
+ .bak: 1
+
+To complete the dequeing process, removing all traces of the message file,
+finish it (without preservation).
+
+ >>> switchboard.finish(filebase)
+ >>> check_qfiles()
+ empty
+
+When enqueing a file, you can provide additional metadata keys by using
+keyword arguments.
+
+ >>> filebase = switchboard.enqueue(msg, {'foo': 1}, bar=2)
+ >>> msg, msgdata = switchboard.dequeue(filebase)
+ >>> switchboard.finish(filebase)
+ >>> dump_msgdata(msgdata)
+ _parsemsg: False
+ bar : 2
+ foo : 1
+ version : 3
+
+Keyword arguments override keys from the metadata dictionary.
+
+ >>> filebase = switchboard.enqueue(msg, {'foo': 1}, foo=2)
+ >>> msg, msgdata = switchboard.dequeue(filebase)
+ >>> switchboard.finish(filebase)
+ >>> dump_msgdata(msgdata)
+ _parsemsg: False
+ foo : 2
+ version : 3
+
+
+Iterating over files
+--------------------
+
+There are two ways to iterate over all the files in a switchboard's queue.
+Normally, queue files end in .pck (for 'pickle') and the easiest way to
+iterate over just these files is to use the .files attribute.
+
+ >>> filebase_1 = switchboard.enqueue(msg, foo=1)
+ >>> filebase_2 = switchboard.enqueue(msg, foo=2)
+ >>> filebase_3 = switchboard.enqueue(msg, foo=3)
+ >>> filebases = sorted((filebase_1, filebase_2, filebase_3))
+ >>> sorted(switchboard.files) == filebases
+ True
+ >>> check_qfiles()
+ .pck: 3
+
+You can also use the .get_files() method if you want to iterate over all the
+file bases for some other extension.
+
+ >>> for filebase in switchboard.get_files():
+ ... msg, msgdata = switchboard.dequeue(filebase)
+ >>> bakfiles = sorted(switchboard.get_files('.bak'))
+ >>> bakfiles == filebases
+ True
+ >>> check_qfiles()
+ .bak: 3
+ >>> for filebase in switchboard.get_files('.bak'):
+ ... switchboard.finish(filebase)
+ >>> check_qfiles()
+ empty
+
+
+Recovering files
+----------------
+
+Calling .dequeue() without calling .finish() leaves .bak backup files in
+place. These can be recovered when the switchboard is instantiated.
+
+ >>> filebase_1 = switchboard.enqueue(msg, foo=1)
+ >>> filebase_2 = switchboard.enqueue(msg, foo=2)
+ >>> filebase_3 = switchboard.enqueue(msg, foo=3)
+ >>> for filebase in switchboard.files:
+ ... msg, msgdata = switchboard.dequeue(filebase)
+ ... # Don't call .finish()
+ >>> check_qfiles()
+ .bak: 3
+ >>> switchboard_2 = Switchboard('test', queue_directory, recover=True)
+ >>> check_qfiles()
+ .pck: 3
+
+The files can be recovered explicitly.
+
+ >>> for filebase in switchboard.files:
+ ... msg, msgdata = switchboard.dequeue(filebase)
+ ... # Don't call .finish()
+ >>> check_qfiles()
+ .bak: 3
+ >>> switchboard.recover_backup_files()
+ >>> check_qfiles()
+ .pck: 3
+
+But the files will only be recovered at most three times before they are
+considered defective. In order to prevent mail bombs and loops, once this
+maximum is reached, the files will be preserved in the 'bad' queue.
+::
+
+ >>> for filebase in switchboard.files:
+ ... msg, msgdata = switchboard.dequeue(filebase)
+ ... # Don't call .finish()
+ >>> check_qfiles()
+ .bak: 3
+ >>> switchboard.recover_backup_files()
+ >>> check_qfiles()
+ empty
+
+ >>> bad = config.switchboards['bad']
+ >>> check_qfiles(bad.queue_directory)
+ .psv: 3
+
+
+Clean up
+--------
+
+ >>> for file in os.listdir(bad.queue_directory):
+ ... os.remove(os.path.join(bad.queue_directory, file))
+ >>> check_qfiles(bad.queue_directory)
+ empty
+
+
+Queue slices
+------------
+
+XXX Add tests for queue slices.
diff --git a/src/mailman/core/runner.py b/src/mailman/core/runner.py
new file mode 100644
index 000000000..3d876ac3d
--- /dev/null
+++ b/src/mailman/core/runner.py
@@ -0,0 +1,249 @@
+# Copyright (C) 2001-2011 by the Free Software Foundation, Inc.
+#
+# This file is part of GNU Mailman.
+#
+# GNU Mailman is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
+
+"""The process runner base class."""
+
+from __future__ import absolute_import, unicode_literals
+
+__metaclass__ = type
+__all__ = [
+ 'Runner',
+ ]
+
+
+import time
+import logging
+import traceback
+
+from cStringIO import StringIO
+from lazr.config import as_boolean, as_timedelta
+from zope.component import getUtility
+from zope.interface import implements
+
+from mailman.config import config
+from mailman.core.i18n import _
+from mailman.core.switchboard import Switchboard
+from mailman.interfaces.languages import ILanguageManager
+from mailman.interfaces.listmanager import IListManager
+from mailman.interfaces.runner import IRunner
+from mailman.utilities.string import expand
+
+
+dlog = logging.getLogger('mailman.debug')
+elog = logging.getLogger('mailman.error')
+
+
+
+class Runner:
+ implements(IRunner)
+
+ intercept_signals = True
+
+ def __init__(self, name, slice=None):
+ """Create a queue runner.
+
+ :param slice: The slice number for this queue runner. This is passed
+ directly to the underlying `ISwitchboard` object.
+ :type slice: int or None
+ """
+ # Grab the configuration section.
+ self.name = name
+ section = getattr(config, 'qrunner.' + name)
+ substitutions = config.paths
+ substitutions['name'] = name
+ self.queue_directory = expand(section.path, substitutions)
+ numslices = int(section.instances)
+ self.switchboard = Switchboard(
+ name, self.queue_directory, slice, numslices, True)
+ self.sleep_time = as_timedelta(section.sleep_time)
+ # sleep_time is a timedelta; turn it into a float for time.sleep().
+ self.sleep_float = (86400 * self.sleep_time.days +
+ self.sleep_time.seconds +
+ self.sleep_time.microseconds / 1.0e6)
+ self.max_restarts = int(section.max_restarts)
+ self.start = as_boolean(section.start)
+ self._stop = False
+
+ def __repr__(self):
+ return '<{0} at {1:#x}>'.format(self.__class__.__name__, id(self))
+
+ def stop(self):
+ """See `IRunner`."""
+ self._stop = True
+
+ def run(self):
+ """See `IRunner`."""
+ # Start the main loop for this queue runner.
+ try:
+ while True:
+ # Once through the loop that processes all the files in the
+ # queue directory.
+ filecnt = self._one_iteration()
+ # Do the periodic work for the subclass.
+ self._do_periodic()
+ # If the stop flag is set, we're done.
+ if self._stop:
+ break
+ # Give the runner an opportunity to snooze for a while, but
+ # pass it the file count so it can decide whether to do more
+ # work now or not.
+ self._snooze(filecnt)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ self._clean_up()
+
+ def _one_iteration(self):
+ """See `IRunner`."""
+ me = self.__class__.__name__
+ dlog.debug('[%s] starting oneloop', me)
+ # List all the files in our queue directory. The switchboard is
+ # guaranteed to hand us the files in FIFO order.
+ files = self.switchboard.files
+ for filebase in files:
+ dlog.debug('[%s] processing filebase: %s', me, filebase)
+ try:
+ # Ask the switchboard for the message and metadata objects
+ # associated with this queue file.
+ msg, msgdata = self.switchboard.dequeue(filebase)
+ except Exception as error:
+ # This used to just catch email.Errors.MessageParseError, but
+ # other problems can occur in message parsing, e.g.
+ # ValueError, and exceptions can occur in unpickling too. We
+ # don't want the runner to die, so we just log and skip this
+ # entry, but preserve it for analysis.
+ self._log(error)
+ elog.error('Skipping and preserving unparseable message: %s',
+ filebase)
+ self.switchboard.finish(filebase, preserve=True)
+ config.db.abort()
+ continue
+ try:
+ dlog.debug('[%s] processing onefile', me)
+ self._process_one_file(msg, msgdata)
+ dlog.debug('[%s] finishing filebase: %s', me, filebase)
+ self.switchboard.finish(filebase)
+ except Exception as error:
+ # All runners that implement _dispose() must guarantee that
+ # exceptions are caught and dealt with properly. Still, there
+ # may be a bug in the infrastructure, and we do not want those
+ # to cause messages to be lost. Any uncaught exceptions will
+ # cause the message to be stored in the shunt queue for human
+ # intervention.
+ self._log(error)
+ # Put a marker in the metadata for unshunting.
+ msgdata['whichq'] = self.switchboard.name
+ # It is possible that shunting can throw an exception, e.g. a
+ # permissions problem or a MemoryError due to a really large
+ # message. Try to be graceful.
+ try:
+ shunt = config.switchboards['shunt']
+ new_filebase = shunt.enqueue(msg, msgdata)
+ elog.error('SHUNTING: %s', new_filebase)
+ self.switchboard.finish(filebase)
+ except Exception as error:
+ # The message wasn't successfully shunted. Log the
+ # exception and try to preserve the original queue entry
+ # for possible analysis.
+ self._log(error)
+ elog.error(
+ 'SHUNTING FAILED, preserving original entry: %s',
+ filebase)
+ self.switchboard.finish(filebase, preserve=True)
+ config.db.abort()
+ # Other work we want to do each time through the loop.
+ dlog.debug('[%s] doing periodic', me)
+ self._do_periodic()
+ dlog.debug('[%s] checking short circuit', me)
+ if self._short_circuit():
+ dlog.debug('[%s] short circuiting', me)
+ break
+ dlog.debug('[%s] commiting', me)
+ config.db.commit()
+ dlog.debug('[%s] ending oneloop: %s', me, len(files))
+ return len(files)
+
+ def _process_one_file(self, msg, msgdata):
+ """See `IRunner`."""
+ # Do some common sanity checking on the message metadata. It's got to
+ # be destined for a particular mailing list. This switchboard is used
+ # to shunt off badly formatted messages. We don't want to just trash
+ # them because they may be fixable with human intervention. Just get
+ # them out of our sight.
+ #
+ # Find out which mailing list this message is destined for.
+ missing = object()
+ listname = msgdata.get('listname', missing)
+ mlist = (None
+ if listname is missing
+ else getUtility(IListManager).get(unicode(listname)))
+ if mlist is None:
+ elog.error(
+ '%s runner "%s" shunting message for missing list: %s',
+ msg['message-id'], self.name,
+ ('n/a' if listname is missing else listname))
+ config.switchboards['shunt'].enqueue(msg, msgdata)
+ return
+ # Now process this message. We also want to set up the language
+ # context for this message. The context will be the preferred
+ # language for the user if the sender is a member of the list, or it
+ # will be the list's preferred language. However, we must take
+ # special care to reset the defaults, otherwise subsequent messages
+ # may be translated incorrectly.
+ if mlist is None:
+ language_manager = getUtility(ILanguageManager)
+ language = language_manager[config.mailman.default_language]
+ elif msg.sender:
+ member = mlist.members.get_member(msg.sender)
+ language = (member.preferred_language
+ if member is not None
+ else mlist.preferred_language)
+ else:
+ language = mlist.preferred_language
+ with _.using(language.code):
+ msgdata['lang'] = language.code
+ keepqueued = self._dispose(mlist, msg, msgdata)
+ if keepqueued:
+ self.switchboard.enqueue(msg, msgdata)
+
+ def _log(self, exc):
+ elog.error('Uncaught runner exception: %s', exc)
+ s = StringIO()
+ traceback.print_exc(file=s)
+ elog.error('%s', s.getvalue())
+
+ def _clean_up(self):
+ """See `IRunner`."""
+ pass
+
+ def _dispose(self, mlist, msg, msgdata):
+ """See `IRunner`."""
+ raise NotImplementedError
+
+ def _do_periodic(self):
+ """See `IRunner`."""
+ pass
+
+ def _snooze(self, filecnt):
+ """See `IRunner`."""
+ if filecnt or self.sleep_float <= 0:
+ return
+ time.sleep(self.sleep_float)
+
+ def _short_circuit(self):
+ """See `IRunner`."""
+ return self._stop
diff --git a/src/mailman/core/switchboard.py b/src/mailman/core/switchboard.py
new file mode 100644
index 000000000..5d9eb65ce
--- /dev/null
+++ b/src/mailman/core/switchboard.py
@@ -0,0 +1,269 @@
+# Copyright (C) 2001-2011 by the Free Software Foundation, Inc.
+#
+# This file is part of GNU Mailman.
+#
+# GNU Mailman is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
+
+"""Queing and dequeuing message/metadata pickle files.
+
+Messages are represented as email.message.Message objects (or an instance ofa
+subclass). Metadata is represented as a Python dictionary. For every
+message/metadata pair in a queue, a single file containing two pickles is
+written. First, the message is written to the pickle, then the metadata
+dictionary is written.
+"""
+
+from __future__ import absolute_import, unicode_literals
+
+__metaclass__ = type
+__all__ = [
+ 'Switchboard',
+ ]
+
+
+import os
+import time
+import email
+import pickle
+import cPickle
+import hashlib
+import logging
+
+from zope.interface import implements
+
+from mailman.config import config
+from mailman.email.message import Message
+from mailman.interfaces.switchboard import ISwitchboard
+from mailman.utilities.filesystem import makedirs
+from mailman.utilities.string import expand
+
+
+# 20 bytes of all bits set, maximum hashlib.sha.digest() value
+shamax = 0xffffffffffffffffffffffffffffffffffffffffL
+# Small increment to add to time in case two entries have the same time. This
+# prevents skipping one of two entries with the same time until the next pass.
+DELTA = .0001
+# We count the number of times a file has been moved to .bak and recovered.
+# In order to prevent loops and a message flood, when the count reaches this
+# value, we move the file to the bad queue as a .psv.
+MAX_BAK_COUNT = 3
+
+elog = logging.getLogger('mailman.error')
+
+
+
+class Switchboard:
+ implements(ISwitchboard)
+
+ @staticmethod
+ def initialize():
+ """Initialize the global switchboards for input/output."""
+ for conf in config.qrunner_configs:
+ name = conf.name.split('.')[-1]
+ assert name not in config.switchboards, (
+ 'Duplicate qrunner name: {0}'.format(name))
+ substitutions = config.paths
+ substitutions['name'] = name
+ path = expand(conf.path, substitutions)
+ config.switchboards[name] = Switchboard(name, path)
+
+ def __init__(self, name, queue_directory,
+ slice=None, numslices=1, recover=False):
+ """Create a switchboard object.
+
+ :param name: The queue name.
+ :type name: str
+ :param queue_directory: The queue directory.
+ :type queue_directory: str
+ :param slice: The slice number for this switchboard, or None. If not
+ None, it must be [0..`numslices`).
+ :type slice: int or None
+ :param numslices: The total number of slices to split this queue
+ directory into. It must be a power of 2.
+ :type numslices: int
+ :param recover: True if backup files should be recovered.
+ :type recover: bool
+ """
+ assert (numslices & (numslices - 1)) == 0, (
+ 'Not a power of 2: {0}'.format(numslices))
+ self.name = name
+ self.queue_directory = queue_directory
+ # If configured to, create the directory if it doesn't yet exist.
+ if config.create_paths:
+ makedirs(self.queue_directory, 0770)
+ # Fast track for no slices
+ self._lower = None
+ self._upper = None
+ # BAW: test performance and end-cases of this algorithm
+ if numslices <> 1:
+ self._lower = ((shamax + 1) * slice) / numslices
+ self._upper = (((shamax + 1) * (slice + 1)) / numslices) - 1
+ if recover:
+ self.recover_backup_files()
+
+ def enqueue(self, _msg, _metadata=None, **_kws):
+ """See `ISwitchboard`."""
+ if _metadata is None:
+ _metadata = {}
+ # Calculate the SHA hexdigest of the message to get a unique base
+ # filename. We're also going to use the digest as a hash into the set
+ # of parallel qrunner processes.
+ data = _metadata.copy()
+ data.update(_kws)
+ listname = data.get('listname', '--nolist--')
+ # Get some data for the input to the sha hash.
+ now = time.time()
+ if data.get('_plaintext'):
+ protocol = 0
+ msgsave = cPickle.dumps(str(_msg), protocol)
+ else:
+ protocol = pickle.HIGHEST_PROTOCOL
+ msgsave = cPickle.dumps(_msg, protocol)
+ # listname is unicode but the input to the hash function must be an
+ # 8-bit string (eventually, a bytes object).
+ hashfood = msgsave + listname.encode('utf-8') + repr(now)
+ # Encode the current time into the file name for FIFO sorting. The
+ # file name consists of two parts separated by a '+': the received
+ # time for this message (i.e. when it first showed up on this system)
+ # and the sha hex digest.
+ rcvtime = data.setdefault('received_time', now)
+ filebase = repr(rcvtime) + '+' + hashlib.sha1(hashfood).hexdigest()
+ filename = os.path.join(self.queue_directory, filebase + '.pck')
+ tmpfile = filename + '.tmp'
+ # Always add the metadata schema version number
+ data['version'] = config.QFILE_SCHEMA_VERSION
+ # Filter out volatile entries. Use .keys() so that we can mutate the
+ # dictionary during the iteration.
+ for k in data.keys():
+ if k.startswith('_'):
+ del data[k]
+ # We have to tell the dequeue() method whether to parse the message
+ # object or not.
+ data['_parsemsg'] = (protocol == 0)
+ # Write to the pickle file the message object and metadata.
+ with open(tmpfile, 'w') as fp:
+ fp.write(msgsave)
+ cPickle.dump(data, fp, protocol)
+ fp.flush()
+ os.fsync(fp.fileno())
+ os.rename(tmpfile, filename)
+ return filebase
+
+ def dequeue(self, filebase):
+ """See `ISwitchboard`."""
+ # Calculate the filename from the given filebase.
+ filename = os.path.join(self.queue_directory, filebase + '.pck')
+ backfile = os.path.join(self.queue_directory, filebase + '.bak')
+ # Read the message object and metadata.
+ with open(filename) as fp:
+ # Move the file to the backup file name for processing. If this
+ # process crashes uncleanly the .bak file will be used to
+ # re-instate the .pck file in order to try again.
+ os.rename(filename, backfile)
+ msg = cPickle.load(fp)
+ data = cPickle.load(fp)
+ if data.get('_parsemsg'):
+ # Calculate the original size of the text now so that we won't
+ # have to generate the message later when we do size restriction
+ # checking.
+ original_size = len(msg)
+ msg = email.message_from_string(msg, Message)
+ msg.original_size = original_size
+ data['original_size'] = original_size
+ return msg, data
+
+ def finish(self, filebase, preserve=False):
+ """See `ISwitchboard`."""
+ bakfile = os.path.join(self.queue_directory, filebase + '.bak')
+ try:
+ if preserve:
+ bad_dir = config.switchboards['bad'].queue_directory
+ psvfile = os.path.join(bad_dir, filebase + '.psv')
+ os.rename(bakfile, psvfile)
+ else:
+ os.unlink(bakfile)
+ except EnvironmentError:
+ elog.exception(
+ 'Failed to unlink/preserve backup file: %s', bakfile)
+
+ @property
+ def files(self):
+ """See `ISwitchboard`."""
+ return self.get_files()
+
+ def get_files(self, extension='.pck'):
+ """See `ISwitchboard`."""
+ times = {}
+ lower = self._lower
+ upper = self._upper
+ for f in os.listdir(self.queue_directory):
+ # By ignoring anything that doesn't end in .pck, we ignore
+ # tempfiles and avoid a race condition.
+ filebase, ext = os.path.splitext(f)
+ if ext <> extension:
+ continue
+ when, digest = filebase.split('+', 1)
+ # Throw out any files which don't match our bitrange. BAW: test
+ # performance and end-cases of this algorithm. MAS: both
+ # comparisons need to be <= to get complete range.
+ if lower is None or (lower <= long(digest, 16) <= upper):
+ key = float(when)
+ while key in times:
+ key += DELTA
+ times[key] = filebase
+ # FIFO sort
+ return [times[key] for key in sorted(times)]
+
+ def recover_backup_files(self):
+ """See `ISwitchboard`."""
+ # Move all .bak files in our slice to .pck. It's impossible for both
+ # to exist at the same time, so the move is enough to ensure that our
+ # normal dequeuing process will handle them. We keep count in
+ # _bak_count in the metadata of the number of times we recover this
+ # file. When the count reaches MAX_BAK_COUNT, we move the .bak file
+ # to a .psv file in the bad queue.
+ for filebase in self.get_files('.bak'):
+ src = os.path.join(self.queue_directory, filebase + '.bak')
+ dst = os.path.join(self.queue_directory, filebase + '.pck')
+ fp = open(src, 'rb+')
+ try:
+ try:
+ msg = cPickle.load(fp)
+ data_pos = fp.tell()
+ data = cPickle.load(fp)
+ except Exception as error:
+ # If unpickling throws any exception, just log and
+ # preserve this entry
+ elog.error('Unpickling .bak exception: %s\n'
+ 'Preserving file: %s', error, filebase)
+ self.finish(filebase, preserve=True)
+ else:
+ data['_bak_count'] = data.get('_bak_count', 0) + 1
+ fp.seek(data_pos)
+ if data.get('_parsemsg'):
+ protocol = 0
+ else:
+ protocol = 1
+ cPickle.dump(data, fp, protocol)
+ fp.truncate()
+ fp.flush()
+ os.fsync(fp.fileno())
+ if data['_bak_count'] >= MAX_BAK_COUNT:
+ elog.error('.bak file max count, preserving file: %s',
+ filebase)
+ self.finish(filebase, preserve=True)
+ else:
+ os.rename(src, dst)
+ finally:
+ fp.close()