diff options
| author | Barry Warsaw | 2007-06-27 17:33:42 -0400 |
|---|---|---|
| committer | Barry Warsaw | 2007-06-27 17:33:42 -0400 |
| commit | 9344d94ef8e0a92a27517b5bd5b34f57c9b7bfaa (patch) | |
| tree | c12bd0bde71928c9dd5b22d4c46b2394c1bf46ba /Mailman/Queue/Switchboard.py | |
| parent | 55b97d69b0c08e66215a673f3cd92ab7d260c714 (diff) | |
| download | mailman-9344d94ef8e0a92a27517b5bd5b34f57c9b7bfaa.tar.gz mailman-9344d94ef8e0a92a27517b5bd5b34f57c9b7bfaa.tar.zst mailman-9344d94ef8e0a92a27517b5bd5b34f57c9b7bfaa.zip | |
Diffstat (limited to 'Mailman/Queue/Switchboard.py')
| -rw-r--r-- | Mailman/Queue/Switchboard.py | 131 |
1 files changed, 62 insertions, 69 deletions
diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py index 6f5cd6222..91dfad8c0 100644 --- a/Mailman/Queue/Switchboard.py +++ b/Mailman/Queue/Switchboard.py @@ -15,23 +15,16 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, # USA. -"""Reading and writing message objects and message metadata.""" +"""Queing and dequeuing message/metadata pickle files. -# enqueue() and dequeue() are not symmetric. enqueue() takes a Message -# object. dequeue() returns a email.Message object tree. -# -# Message metadata is represented internally as a Python dictionary. Keys and -# values must be strings. When written to a queue directory, the metadata is -# written into an externally represented format, as defined here. Because -# components of the Mailman system may be written in something other than -# Python, the external interchange format should be chosen based on what those -# other components can read and write. -# -# Most efficient, and recommended if everything is Python, is Python marshal -# format. Also supported by default is Berkeley db format (using the default -# bsddb module compiled into your Python executable -- usually Berkeley db -# 2), and rfc822 style plain text. You can write your own if you have other -# needs. +Messages are represented as email.message.Message objects (or an instance ofa +subclass). Metadata is represented as a Python dictionary. For every +message/metadata pair in a queue, a single file containing two pickles is +written. First, the message is written to the pickle, then the metadata +dictionary is written. +""" + +from __future__ import with_statement import os import sha @@ -42,17 +35,16 @@ import cPickle import logging import marshal +from zope.interface import implements + from Mailman import Message from Mailman import Utils from Mailman.configuration import config +from Mailman.interfaces import ISwitchboard # 20 bytes of all bits set, maximum sha.digest() value shamax = 0xffffffffffffffffffffffffffffffffffffffffL -# This flag causes messages to be written as pickles (when True) or text files -# (when False). Pickles are more efficient because the message doesn't need -# to be re-parsed every time it's unqueued, but pickles are not human readable. -SAVE_MSGS_AS_PICKLES = True # Small increment to add to time in case two entries have the same time. This # prevents skipping one of two entries with the same time until the next pass. DELTA = .0001 @@ -62,33 +54,38 @@ elog = logging.getLogger('mailman.error') class Switchboard: + implements(ISwitchboard) + def __init__(self, whichq, slice=None, numslices=1, recover=False): - self.__whichq = whichq + self._whichq = whichq # Create the directory if it doesn't yet exist. - Utils.makedirs(self.__whichq, 0770) + Utils.makedirs(self._whichq, 0770) # Fast track for no slices - self.__lower = None - self.__upper = None + self._lower = None + self._upper = None # BAW: test performance and end-cases of this algorithm if numslices <> 1: - self.__lower = ((shamax+1) * slice) / numslices - self.__upper = (((shamax+1) * (slice+1)) / numslices) - 1 + self._lower = ((shamax + 1) * slice) / numslices + self._upper = (((shamax + 1) * (slice + 1)) / numslices) - 1 if recover: self.recover_backup_files() - def whichq(self): - return self.__whichq + @property + def queue_directory(self): + return self._whichq - def enqueue(self, _msg, _metadata={}, **_kws): + def enqueue(self, _msg, _metadata=None, **_kws): + if _metadata is None: + _metadata = {} # Calculate the SHA hexdigest of the message to get a unique base # filename. We're also going to use the digest as a hash into the set # of parallel qrunner processes. data = _metadata.copy() data.update(_kws) listname = data.get('listname', '--nolist--') - # Get some data for the input to the sha hash + # Get some data for the input to the sha hash. now = time.time() - if SAVE_MSGS_AS_PICKLES and not data.get('_plaintext'): + if not data.get('_plaintext'): protocol = 1 msgsave = cPickle.dumps(_msg, protocol) else: @@ -96,61 +93,54 @@ class Switchboard: msgsave = cPickle.dumps(str(_msg), protocol) # listname is unicode but the input to the hash function must be an # 8-bit string (eventually, a bytes object). - hashfood = msgsave + listname.encode('utf-8') + `now` - # Encode the current time into the file name for FIFO sorting in - # files(). The file name consists of two parts separated by a `+': - # the received time for this message (i.e. when it first showed up on - # this system) and the sha hex digest. - #rcvtime = data.setdefault('received_time', now) + hashfood = msgsave + listname.encode('utf-8') + repr(now) + # Encode the current time into the file name for FIFO sorting. The + # file name consists of two parts separated by a '+': the received + # time for this message (i.e. when it first showed up on this system) + # and the sha hex digest. rcvtime = data.setdefault('received_time', now) - filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest() - filename = os.path.join(self.__whichq, filebase + '.pck') + filebase = repr(rcvtime) + '+' + sha.new(hashfood).hexdigest() + filename = os.path.join(self._whichq, filebase + '.pck') tmpfile = filename + '.tmp' # Always add the metadata schema version number data['version'] = config.QFILE_SCHEMA_VERSION # Filter out volatile entries - for k in data.keys(): + for k in data: if k.startswith('_'): del data[k] # We have to tell the dequeue() method whether to parse the message # object or not. data['_parsemsg'] = (protocol == 0) # Write to the pickle file the message object and metadata. - fp = open(tmpfile, 'w') - try: + with open(tmpfile, 'w') as fp: fp.write(msgsave) cPickle.dump(data, fp, protocol) fp.flush() os.fsync(fp.fileno()) - finally: - fp.close() os.rename(tmpfile, filename) return filebase def dequeue(self, filebase): # Calculate the filename from the given filebase. - filename = os.path.join(self.__whichq, filebase + '.pck') - backfile = os.path.join(self.__whichq, filebase + '.bak') + filename = os.path.join(self._whichq, filebase + '.pck') + backfile = os.path.join(self._whichq, filebase + '.bak') # Read the message object and metadata. - fp = open(filename) - # Move the file to the backup file name for processing. If this - # process crashes uncleanly the .bak file will be used to re-instate - # the .pck file in order to try again. XXX what if something caused - # Python to constantly crash? Is it possible that we'd end up mail - # bombing recipients or crushing the archiver? How would we defend - # against that? - os.rename(filename, backfile) - try: + with open(filename) as fp: + # Move the file to the backup file name for processing. If this + # process crashes uncleanly the .bak file will be used to + # re-instate the .pck file in order to try again. XXX what if + # something caused Python to constantly crash? Is it possible + # that we'd end up mail bombing recipients or crushing the + # archiver? How would we defend against that? + os.rename(filename, backfile) msg = cPickle.load(fp) data = cPickle.load(fp) - finally: - fp.close() if data.get('_parsemsg'): msg = email.message_from_string(msg, Message.Message) return msg, data def finish(self, filebase, preserve=False): - bakfile = os.path.join(self.__whichq, filebase + '.bak') + bakfile = os.path.join(self._whichq, filebase + '.bak') try: if preserve: psvfile = os.path.join(config.SHUNTQUEUE_DIR, @@ -164,17 +154,21 @@ class Switchboard: elog.exception('Failed to unlink/preserve backup file: %s', bakfile) - def files(self, extension='.pck'): + @property + def files(self): + return self.get_files() + + def get_files(self, extension='.pck'): times = {} - lower = self.__lower - upper = self.__upper - for f in os.listdir(self.__whichq): + lower = self._lower + upper = self._upper + for f in os.listdir(self._whichq): # By ignoring anything that doesn't end in .pck, we ignore # tempfiles and avoid a race condition. filebase, ext = os.path.splitext(f) if ext <> extension: continue - when, digest = filebase.split('+') + when, digest = filebase.split('+', 1) # Throw out any files which don't match our bitrange. BAW: test # performance and end-cases of this algorithm. MAS: both # comparisons need to be <= to get complete range. @@ -184,15 +178,14 @@ class Switchboard: key += DELTA times[key] = filebase # FIFO sort - keys = times.keys() - keys.sort() - return [times[k] for k in keys] + for key in sorted(times): + yield times[key] def recover_backup_files(self): # Move all .bak files in our slice to .pck. It's impossible for both # to exist at the same time, so the move is enough to ensure that our # normal dequeuing process will handle them. - for filebase in self.files('.bak'): - src = os.path.join(self.__whichq, filebase + '.bak') - dst = os.path.join(self.__whichq, filebase + '.pck') + for filebase in self.get_files('.bak'): + src = os.path.join(self._whichq, filebase + '.bak') + dst = os.path.join(self._whichq, filebase + '.pck') os.rename(src, dst) |
