diff options
| author | Barry Warsaw | 2007-06-27 17:33:42 -0400 |
|---|---|---|
| committer | Barry Warsaw | 2007-06-27 17:33:42 -0400 |
| commit | 9344d94ef8e0a92a27517b5bd5b34f57c9b7bfaa (patch) | |
| tree | c12bd0bde71928c9dd5b22d4c46b2394c1bf46ba | |
| parent | 55b97d69b0c08e66215a673f3cd92ab7d260c714 (diff) | |
| download | mailman-9344d94ef8e0a92a27517b5bd5b34f57c9b7bfaa.tar.gz mailman-9344d94ef8e0a92a27517b5bd5b34f57c9b7bfaa.tar.zst mailman-9344d94ef8e0a92a27517b5bd5b34f57c9b7bfaa.zip | |
| -rw-r--r-- | Mailman/Queue/Runner.py | 2 | ||||
| -rw-r--r-- | Mailman/Queue/Switchboard.py | 131 | ||||
| -rw-r--r-- | Mailman/Queue/tests/test_runners.py | 73 | ||||
| -rw-r--r-- | Mailman/bin/testall.py | 54 | ||||
| -rw-r--r-- | Mailman/configuration.py | 8 | ||||
| -rw-r--r-- | Mailman/docs/switchboard.txt | 151 | ||||
| -rw-r--r-- | Mailman/initialize.py | 2 | ||||
| -rw-r--r-- | Mailman/interfaces/switchboard.py | 82 |
8 files changed, 341 insertions, 162 deletions
diff --git a/Mailman/Queue/Runner.py b/Mailman/Queue/Runner.py index 2045022fa..953a201de 100644 --- a/Mailman/Queue/Runner.py +++ b/Mailman/Queue/Runner.py @@ -115,7 +115,7 @@ class Runner: # intervention. self._log(e) # Put a marker in the metadata for unshunting - msgdata['whichq'] = self._switchboard.whichq() + msgdata['whichq'] = self._switchboard.queue_directory # It is possible that shunting can throw an exception, e.g. a # permissions problem or a MemoryError due to a really large # message. Try to be graceful. diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py index 6f5cd6222..91dfad8c0 100644 --- a/Mailman/Queue/Switchboard.py +++ b/Mailman/Queue/Switchboard.py @@ -15,23 +15,16 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, # USA. -"""Reading and writing message objects and message metadata.""" +"""Queing and dequeuing message/metadata pickle files. -# enqueue() and dequeue() are not symmetric. enqueue() takes a Message -# object. dequeue() returns a email.Message object tree. -# -# Message metadata is represented internally as a Python dictionary. Keys and -# values must be strings. When written to a queue directory, the metadata is -# written into an externally represented format, as defined here. Because -# components of the Mailman system may be written in something other than -# Python, the external interchange format should be chosen based on what those -# other components can read and write. -# -# Most efficient, and recommended if everything is Python, is Python marshal -# format. Also supported by default is Berkeley db format (using the default -# bsddb module compiled into your Python executable -- usually Berkeley db -# 2), and rfc822 style plain text. You can write your own if you have other -# needs. +Messages are represented as email.message.Message objects (or an instance ofa +subclass). Metadata is represented as a Python dictionary. For every +message/metadata pair in a queue, a single file containing two pickles is +written. First, the message is written to the pickle, then the metadata +dictionary is written. +""" + +from __future__ import with_statement import os import sha @@ -42,17 +35,16 @@ import cPickle import logging import marshal +from zope.interface import implements + from Mailman import Message from Mailman import Utils from Mailman.configuration import config +from Mailman.interfaces import ISwitchboard # 20 bytes of all bits set, maximum sha.digest() value shamax = 0xffffffffffffffffffffffffffffffffffffffffL -# This flag causes messages to be written as pickles (when True) or text files -# (when False). Pickles are more efficient because the message doesn't need -# to be re-parsed every time it's unqueued, but pickles are not human readable. -SAVE_MSGS_AS_PICKLES = True # Small increment to add to time in case two entries have the same time. This # prevents skipping one of two entries with the same time until the next pass. DELTA = .0001 @@ -62,33 +54,38 @@ elog = logging.getLogger('mailman.error') class Switchboard: + implements(ISwitchboard) + def __init__(self, whichq, slice=None, numslices=1, recover=False): - self.__whichq = whichq + self._whichq = whichq # Create the directory if it doesn't yet exist. - Utils.makedirs(self.__whichq, 0770) + Utils.makedirs(self._whichq, 0770) # Fast track for no slices - self.__lower = None - self.__upper = None + self._lower = None + self._upper = None # BAW: test performance and end-cases of this algorithm if numslices <> 1: - self.__lower = ((shamax+1) * slice) / numslices - self.__upper = (((shamax+1) * (slice+1)) / numslices) - 1 + self._lower = ((shamax + 1) * slice) / numslices + self._upper = (((shamax + 1) * (slice + 1)) / numslices) - 1 if recover: self.recover_backup_files() - def whichq(self): - return self.__whichq + @property + def queue_directory(self): + return self._whichq - def enqueue(self, _msg, _metadata={}, **_kws): + def enqueue(self, _msg, _metadata=None, **_kws): + if _metadata is None: + _metadata = {} # Calculate the SHA hexdigest of the message to get a unique base # filename. We're also going to use the digest as a hash into the set # of parallel qrunner processes. data = _metadata.copy() data.update(_kws) listname = data.get('listname', '--nolist--') - # Get some data for the input to the sha hash + # Get some data for the input to the sha hash. now = time.time() - if SAVE_MSGS_AS_PICKLES and not data.get('_plaintext'): + if not data.get('_plaintext'): protocol = 1 msgsave = cPickle.dumps(_msg, protocol) else: @@ -96,61 +93,54 @@ class Switchboard: msgsave = cPickle.dumps(str(_msg), protocol) # listname is unicode but the input to the hash function must be an # 8-bit string (eventually, a bytes object). - hashfood = msgsave + listname.encode('utf-8') + `now` - # Encode the current time into the file name for FIFO sorting in - # files(). The file name consists of two parts separated by a `+': - # the received time for this message (i.e. when it first showed up on - # this system) and the sha hex digest. - #rcvtime = data.setdefault('received_time', now) + hashfood = msgsave + listname.encode('utf-8') + repr(now) + # Encode the current time into the file name for FIFO sorting. The + # file name consists of two parts separated by a '+': the received + # time for this message (i.e. when it first showed up on this system) + # and the sha hex digest. rcvtime = data.setdefault('received_time', now) - filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest() - filename = os.path.join(self.__whichq, filebase + '.pck') + filebase = repr(rcvtime) + '+' + sha.new(hashfood).hexdigest() + filename = os.path.join(self._whichq, filebase + '.pck') tmpfile = filename + '.tmp' # Always add the metadata schema version number data['version'] = config.QFILE_SCHEMA_VERSION # Filter out volatile entries - for k in data.keys(): + for k in data: if k.startswith('_'): del data[k] # We have to tell the dequeue() method whether to parse the message # object or not. data['_parsemsg'] = (protocol == 0) # Write to the pickle file the message object and metadata. - fp = open(tmpfile, 'w') - try: + with open(tmpfile, 'w') as fp: fp.write(msgsave) cPickle.dump(data, fp, protocol) fp.flush() os.fsync(fp.fileno()) - finally: - fp.close() os.rename(tmpfile, filename) return filebase def dequeue(self, filebase): # Calculate the filename from the given filebase. - filename = os.path.join(self.__whichq, filebase + '.pck') - backfile = os.path.join(self.__whichq, filebase + '.bak') + filename = os.path.join(self._whichq, filebase + '.pck') + backfile = os.path.join(self._whichq, filebase + '.bak') # Read the message object and metadata. - fp = open(filename) - # Move the file to the backup file name for processing. If this - # process crashes uncleanly the .bak file will be used to re-instate - # the .pck file in order to try again. XXX what if something caused - # Python to constantly crash? Is it possible that we'd end up mail - # bombing recipients or crushing the archiver? How would we defend - # against that? - os.rename(filename, backfile) - try: + with open(filename) as fp: + # Move the file to the backup file name for processing. If this + # process crashes uncleanly the .bak file will be used to + # re-instate the .pck file in order to try again. XXX what if + # something caused Python to constantly crash? Is it possible + # that we'd end up mail bombing recipients or crushing the + # archiver? How would we defend against that? + os.rename(filename, backfile) msg = cPickle.load(fp) data = cPickle.load(fp) - finally: - fp.close() if data.get('_parsemsg'): msg = email.message_from_string(msg, Message.Message) return msg, data def finish(self, filebase, preserve=False): - bakfile = os.path.join(self.__whichq, filebase + '.bak') + bakfile = os.path.join(self._whichq, filebase + '.bak') try: if preserve: psvfile = os.path.join(config.SHUNTQUEUE_DIR, @@ -164,17 +154,21 @@ class Switchboard: elog.exception('Failed to unlink/preserve backup file: %s', bakfile) - def files(self, extension='.pck'): + @property + def files(self): + return self.get_files() + + def get_files(self, extension='.pck'): times = {} - lower = self.__lower - upper = self.__upper - for f in os.listdir(self.__whichq): + lower = self._lower + upper = self._upper + for f in os.listdir(self._whichq): # By ignoring anything that doesn't end in .pck, we ignore # tempfiles and avoid a race condition. filebase, ext = os.path.splitext(f) if ext <> extension: continue - when, digest = filebase.split('+') + when, digest = filebase.split('+', 1) # Throw out any files which don't match our bitrange. BAW: test # performance and end-cases of this algorithm. MAS: both # comparisons need to be <= to get complete range. @@ -184,15 +178,14 @@ class Switchboard: key += DELTA times[key] = filebase # FIFO sort - keys = times.keys() - keys.sort() - return [times[k] for k in keys] + for key in sorted(times): + yield times[key] def recover_backup_files(self): # Move all .bak files in our slice to .pck. It's impossible for both # to exist at the same time, so the move is enough to ensure that our # normal dequeuing process will handle them. - for filebase in self.files('.bak'): - src = os.path.join(self.__whichq, filebase + '.bak') - dst = os.path.join(self.__whichq, filebase + '.pck') + for filebase in self.get_files('.bak'): + src = os.path.join(self._whichq, filebase + '.bak') + dst = os.path.join(self._whichq, filebase + '.pck') os.rename(src, dst) diff --git a/Mailman/Queue/tests/test_runners.py b/Mailman/Queue/tests/test_runners.py index d3c7f8573..27269909f 100644 --- a/Mailman/Queue/tests/test_runners.py +++ b/Mailman/Queue/tests/test_runners.py @@ -114,78 +114,6 @@ A message -class TestSwitchboard(TestBase): - def setUp(self): - TestBase.setUp(self) - self._tmpdir = tempfile.mkdtemp() - self._msg = email.message_from_string("""\ -From: aperson@dom.ain -To: _xtest@dom.ain - -A test message. -""") - self._sb = Switchboard(self._tmpdir) - - def tearDown(self): - shutil.rmtree(self._tmpdir, True) - TestBase.tearDown(self) - - def _count_qfiles(self): - files = {} - for qfile in os.listdir(self._tmpdir): - root, ext = os.path.splitext(qfile) - files[ext] = files.get(ext, 0) + 1 - return files - - def test_whichq(self): - self.assertEqual(self._sb.whichq(), self._tmpdir) - - def test_enqueue(self): - eq = self.assertEqual - self._sb.enqueue(self._msg, listname='_xtest@dom.ain') - files = self._count_qfiles() - eq(len(files), 1) - eq(files['.pck'], 1) - - def test_dequeue(self): - eq = self.assertEqual - self._sb.enqueue(self._msg, listname='_xtest@dom.ain', foo='yes') - count = 0 - for filebase in self._sb.files(): - msg, data = self._sb.dequeue(filebase) - self._sb.finish(filebase) - count += 1 - eq(msg['from'], self._msg['from']) - eq(msg['to'], self._msg['to']) - eq(data['foo'], 'yes') - eq(count, 1) - files = self._count_qfiles() - eq(len(files), 0) - - def test_bakfile(self): - eq = self.assertEqual - self._sb.enqueue(self._msg, listname='_xtest@dom.ain') - self._sb.enqueue(self._msg, listname='_xtest@dom.ain') - self._sb.enqueue(self._msg, listname='_xtest@dom.ain') - for filebase in self._sb.files(): - self._sb.dequeue(filebase) - files = self._count_qfiles() - eq(len(files), 1) - eq(files['.bak'], 3) - - def test_recover(self): - eq = self.assertEqual - self._sb.enqueue(self._msg, listname='_xtest@dom.ain') - for filebase in self._sb.files(): - self._sb.dequeue(filebase) - # Not calling sb.finish() leaves .bak files - sb2 = Switchboard(self._tmpdir, recover=True) - files = self._count_qfiles() - eq(len(files), 1) - eq(files['.pck'], 1) - - - class TestableRunner(Runner): def _dispose(self, mlist, msg, msgdata): self.msg = msg @@ -231,6 +159,5 @@ A test message. def test_suite(): suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(TestPrepMessage)) - suite.addTest(unittest.makeSuite(TestSwitchboard)) suite.addTest(unittest.makeSuite(TestRunner)) return suite diff --git a/Mailman/bin/testall.py b/Mailman/bin/testall.py index b6dc07ec2..7adb01a92 100644 --- a/Mailman/bin/testall.py +++ b/Mailman/bin/testall.py @@ -152,41 +152,57 @@ def main(): # Set up the testing configuration file both for this process, and for all # sub-processes testing will spawn (e.g. the qrunners). # - # Calculate various temporary files needed by the test suite, but only for - # those files which must also go into shared configuration file. + # Calculate a temporary VAR_PREFIX directory so that run-time artifacts of + # the tests won't tread on the installation's data. This also makes it + # easier to clean up after the tests are done, and insures isolation of + # test suite runs. cfg_in = os.path.join(os.path.dirname(Mailman.testing.__file__), 'testing.cfg.in') fd, cfg_out = tempfile.mkstemp(suffix='.cfg') os.close(fd) shutil.copyfile(cfg_in, cfg_out) - initialize_1(cfg_out, propagate_logs=opts.stderr) - mailman_uid = pwd.getpwnam(config.MAILMAN_USER).pw_uid - mailman_gid = grp.getgrnam(config.MAILMAN_GROUP).gr_gid - os.chmod(cfg_out, 0660) - os.chown(cfg_out, mailman_uid, mailman_gid) + var_prefix = tempfile.mkdtemp() + if opts.verbosity > 2: + print 'VAR_PREFIX :', var_prefix + print 'config file:', cfg_out - fd, config.dbfile = tempfile.mkstemp(dir=config.DATA_DIR, suffix='.db') - os.close(fd) - os.chmod(config.dbfile, 0660) - os.chown(config.dbfile, mailman_uid, mailman_gid) + try: + with open(cfg_out, 'a') as fp: + print >> fp, 'VAR_PREFIX = "%s"' % var_prefix - # Patch ups - test_engine_url = 'sqlite:///' + config.dbfile - config.SQLALCHEMY_ENGINE_URL = test_engine_url + initialize_1(cfg_out, propagate_logs=opts.stderr) + mailman_uid = pwd.getpwnam(config.MAILMAN_USER).pw_uid + mailman_gid = grp.getgrnam(config.MAILMAN_GROUP).gr_gid + os.chmod(cfg_out, 0660) + os.chown(cfg_out, mailman_uid, mailman_gid) - with open(cfg_out, 'a') as fp: - print >> fp, 'SQLALCHEMY_ENGINE_URL = "%s"' % test_engine_url + # Create an empty SQLite database file with the proper permissions and + # calculate the SQLAlchemy engine url to this database file. + fd, config.dbfile = tempfile.mkstemp(dir=config.DATA_DIR, suffix='.db') + os.close(fd) + os.chmod(config.dbfile, 0660) + os.chown(config.dbfile, mailman_uid, mailman_gid) - initialize_2() + # Patch ups + test_engine_url = 'sqlite:///' + config.dbfile + config.SQLALCHEMY_ENGINE_URL = test_engine_url - try: + # Write this to the config file so subprocesses share the same testing + # database file. + with open(cfg_out, 'a') as fp: + print >> fp, 'SQLALCHEMY_ENGINE_URL = "%s"' % test_engine_url + + initialize_2() + + # Run the tests basedir = os.path.dirname(Mailman.__file__) runner = unittest.TextTestRunner(verbosity=opts.verbosity) results = runner.run(suite(args)) + finally: os.remove(cfg_out) - os.remove(config.dbfile) + shutil.rmtree(var_prefix) sys.exit(bool(results.failures or results.errors)) diff --git a/Mailman/configuration.py b/Mailman/configuration.py index 3247204b3..aa3e1f9cf 100644 --- a/Mailman/configuration.py +++ b/Mailman/configuration.py @@ -190,6 +190,14 @@ class Configuration(object): for k in self.__dict__ if k.endswith('_DIR')]) + def ensure_directories_exist(self): + for variable, directory in self.paths.items(): + try: + os.makedirs(directory, 02775) + except OSError, e: + if e.errno <> errno.EEXIST: + raise + config = Configuration() diff --git a/Mailman/docs/switchboard.txt b/Mailman/docs/switchboard.txt new file mode 100644 index 000000000..19a437d0c --- /dev/null +++ b/Mailman/docs/switchboard.txt @@ -0,0 +1,151 @@ +The switchboard +=============== + +The switchboard is subsystem that moves messages between queues. Each +instance of a switchboard is responsible for one queue directory. + + >>> from email import message_from_string + >>> from Mailman.Message import Message + >>> from Mailman.Queue.Switchboard import Switchboard + >>> msg = message_from_string("""\ + ... From: aperson@example.com + ... To: _xtest@example.com + ... + ... A test message. + ... """, Message) + +Create a switchboard by giving its queue directory. + + >>> import os + >>> from Mailman.configuration import config + >>> queue_directory = os.path.join(config.QUEUE_DIR, 'test') + >>> switchboard = Switchboard(queue_directory) + >>> switchboard.queue_directory == queue_directory + True + +Here's a helper function for ensuring things work correctly. + + >>> def check_qfiles(): + ... files = {} + ... for qfile in os.listdir(queue_directory): + ... root, ext = os.path.splitext(qfile) + ... files[ext] = files.get(ext, 0) + 1 + ... return sorted(files.items()) + + +Enqueing and dequeing +--------------------- + +The message can be enqueued with metadata specified in the passed in +dictionary. + + >>> filebase = switchboard.enqueue(msg) + >>> check_qfiles() + [('.pck', 1)] + +To read the contents of a queue file, dequeue it. + + >>> msg, msgdata = switchboard.dequeue(filebase) + >>> print msg.as_string() + From: aperson@example.com + To: _xtest@example.com + <BLANKLINE> + A test message. + <BLANKLINE> + >>> sorted(msgdata.items()) + [('_parsemsg', False), ('received_time', ...), ('version', 3)] + >>> check_qfiles() + [('.bak', 1)] + +To complete the dequeing process, removing all traces of the message file, +finish it (without preservation). + + >>> switchboard.finish(filebase) + >>> check_qfiles() + [] + +When enqueing a file, you can provide additional metadata keys by using +keyword arguments. + + >>> filebase = switchboard.enqueue(msg, {'foo': 1}, bar=2) + >>> msg, msgdata = switchboard.dequeue(filebase) + >>> switchboard.finish(filebase) + >>> sorted(msgdata.items()) + [('_parsemsg', False), + ('bar', 2), ('foo', 1), + ('received_time', ...), ('version', 3)] + +Keyword arguments override keys from the metadata dictionary. + + >>> filebase = switchboard.enqueue(msg, {'foo': 1}, foo=2) + >>> msg, msgdata = switchboard.dequeue(filebase) + >>> switchboard.finish(filebase) + >>> sorted(msgdata.items()) + [('_parsemsg', False), + ('foo', 2), + ('received_time', ...), ('version', 3)] + + +Iterating over files +-------------------- + +There are two ways to iterate over all the files in a switchboard's queue. +Normally, queue files end in .pck (for 'pickle') and the easiest way to +iterate over just these files is to use the .files attribute. + + >>> filebase_1 = switchboard.enqueue(msg, foo=1) + >>> filebase_2 = switchboard.enqueue(msg, foo=2) + >>> filebase_3 = switchboard.enqueue(msg, foo=3) + >>> filebases = sorted((filebase_1, filebase_2, filebase_3)) + >>> sorted(switchboard.files) == filebases + True + >>> check_qfiles() + [('.pck', 3)] + +You can also use the .get_files() method if you want to iterate over all the +file bases for some other extension. + + >>> for filebase in switchboard.get_files(): + ... msg, msgdata = switchboard.dequeue(filebase) + >>> bakfiles = sorted(switchboard.get_files('.bak')) + >>> bakfiles == filebases + True + >>> check_qfiles() + [('.bak', 3)] + >>> for filebase in switchboard.get_files('.bak'): + ... switchboard.finish(filebase) + >>> check_qfiles() + [] + + +Recovering files +---------------- + +Calling .dequeue() without calling .finish() leaves .bak backup files in +place. These can be recovered when the switchboard is instantiated. + + >>> filebase_1 = switchboard.enqueue(msg, foo=1) + >>> filebase_2 = switchboard.enqueue(msg, foo=2) + >>> filebase_3 = switchboard.enqueue(msg, foo=3) + >>> for filebase in switchboard.files: + ... msg, msgdata = switchboard.dequeue(filebase) + ... # Don't call .finish() + >>> check_qfiles() + [('.bak', 3)] + >>> switchboard_2 = Switchboard(queue_directory, recover=True) + >>> check_qfiles() + [('.pck', 3)] + +Clean up + + >>> for filebase in switchboard.files: + ... msg, msgdata = switchboard.dequeue(filebase) + ... switchboard.finish(filebase) + >>> check_qfiles() + [] + + +Queue slices +------------ + +XXX Add tests for queue slices. diff --git a/Mailman/initialize.py b/Mailman/initialize.py index 2e7e65b70..9dee94cbe 100644 --- a/Mailman/initialize.py +++ b/Mailman/initialize.py @@ -50,6 +50,8 @@ def initialize_1(config, propagate_logs): # handles that correctly. os.umask(007) Mailman.configuration.config.load(config) + # Create the queue and log directories if they don't already exist. + Mailman.configuration.config.ensure_directories_exist() Mailman.loginit.initialize(propagate_logs) # Set up site extensions directory Mailman.ext.__path__.append(Mailman.configuration.config.EXT_DIR) diff --git a/Mailman/interfaces/switchboard.py b/Mailman/interfaces/switchboard.py new file mode 100644 index 000000000..3a54359c2 --- /dev/null +++ b/Mailman/interfaces/switchboard.py @@ -0,0 +1,82 @@ +# Copyright (C) 2007 by the Free Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, +# USA. + +"""Interface for switchboards.""" + +from zope.interface import Interface, Attribute + + + +class ISwitchboard(Interface): + """The switchboard.""" + + queue_directory = Attribute( + """The name of the queue directory this switchboard is responsible for. + + This should be a subdirectory of the system-wide top-level queue + directory. + """) + + def enqueue(_msg, _metadata=None, **_kws): + """Store the message and metadata in the switchboard's queue. + + When metadata is not given, an empty metadata dictionary is used. The + keyword arguments are added to the metadata dictonary, with precedence + given to the keyword arguments. + + The base name of the message file is returned. + """ + + def dequeue(filebase): + """Return the message and metadata contained in the named file. + + filebase is the base name of the message file as returned by the + .enqueue() method. This file must exist and contain a message and + metadata. The message file is preserved in a backup file, which must + be removed by calling the .finish() method. + + Returned is a 2-tuple of the form (message, metadata). + """ + + def finish(filebase, preserve=False): + """Remove the backup file for filebase. + + If preserve is True, then the backup file is actually just renamed to + a preservation file instead of being unlinked. + """ + + files = Attribute( + """An iterator over all the .pck files in the queue directory. + + The base names of the matching files are returned. + """) + + def get_files(extension='.pck'): + """Like the 'files' attribute, but accepts an alternative extension. + + Only the files in the queue directory that have a matching extension + are returned. Like 'files', the base names of the matching files are + returned. + """ + + def recover_backup_files(): + """Move all backup files to active message files. + + It is impossible for both the .bak and .pck files to exist at the same + time, so moving them is enough to ensure that a normal dequeing + operation will handle them. + """ |
