summaryrefslogtreecommitdiff
path: root/Mailman/Queue/Switchboard.py
diff options
context:
space:
mode:
authorBarry Warsaw2007-06-27 17:33:42 -0400
committerBarry Warsaw2007-06-27 17:33:42 -0400
commit9344d94ef8e0a92a27517b5bd5b34f57c9b7bfaa (patch)
treec12bd0bde71928c9dd5b22d4c46b2394c1bf46ba /Mailman/Queue/Switchboard.py
parent55b97d69b0c08e66215a673f3cd92ab7d260c714 (diff)
downloadmailman-9344d94ef8e0a92a27517b5bd5b34f57c9b7bfaa.tar.gz
mailman-9344d94ef8e0a92a27517b5bd5b34f57c9b7bfaa.tar.zst
mailman-9344d94ef8e0a92a27517b5bd5b34f57c9b7bfaa.zip
Diffstat (limited to 'Mailman/Queue/Switchboard.py')
-rw-r--r--Mailman/Queue/Switchboard.py131
1 files changed, 62 insertions, 69 deletions
diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py
index 6f5cd6222..91dfad8c0 100644
--- a/Mailman/Queue/Switchboard.py
+++ b/Mailman/Queue/Switchboard.py
@@ -15,23 +15,16 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
-"""Reading and writing message objects and message metadata."""
+"""Queing and dequeuing message/metadata pickle files.
-# enqueue() and dequeue() are not symmetric. enqueue() takes a Message
-# object. dequeue() returns a email.Message object tree.
-#
-# Message metadata is represented internally as a Python dictionary. Keys and
-# values must be strings. When written to a queue directory, the metadata is
-# written into an externally represented format, as defined here. Because
-# components of the Mailman system may be written in something other than
-# Python, the external interchange format should be chosen based on what those
-# other components can read and write.
-#
-# Most efficient, and recommended if everything is Python, is Python marshal
-# format. Also supported by default is Berkeley db format (using the default
-# bsddb module compiled into your Python executable -- usually Berkeley db
-# 2), and rfc822 style plain text. You can write your own if you have other
-# needs.
+Messages are represented as email.message.Message objects (or an instance ofa
+subclass). Metadata is represented as a Python dictionary. For every
+message/metadata pair in a queue, a single file containing two pickles is
+written. First, the message is written to the pickle, then the metadata
+dictionary is written.
+"""
+
+from __future__ import with_statement
import os
import sha
@@ -42,17 +35,16 @@ import cPickle
import logging
import marshal
+from zope.interface import implements
+
from Mailman import Message
from Mailman import Utils
from Mailman.configuration import config
+from Mailman.interfaces import ISwitchboard
# 20 bytes of all bits set, maximum sha.digest() value
shamax = 0xffffffffffffffffffffffffffffffffffffffffL
-# This flag causes messages to be written as pickles (when True) or text files
-# (when False). Pickles are more efficient because the message doesn't need
-# to be re-parsed every time it's unqueued, but pickles are not human readable.
-SAVE_MSGS_AS_PICKLES = True
# Small increment to add to time in case two entries have the same time. This
# prevents skipping one of two entries with the same time until the next pass.
DELTA = .0001
@@ -62,33 +54,38 @@ elog = logging.getLogger('mailman.error')
class Switchboard:
+ implements(ISwitchboard)
+
def __init__(self, whichq, slice=None, numslices=1, recover=False):
- self.__whichq = whichq
+ self._whichq = whichq
# Create the directory if it doesn't yet exist.
- Utils.makedirs(self.__whichq, 0770)
+ Utils.makedirs(self._whichq, 0770)
# Fast track for no slices
- self.__lower = None
- self.__upper = None
+ self._lower = None
+ self._upper = None
# BAW: test performance and end-cases of this algorithm
if numslices <> 1:
- self.__lower = ((shamax+1) * slice) / numslices
- self.__upper = (((shamax+1) * (slice+1)) / numslices) - 1
+ self._lower = ((shamax + 1) * slice) / numslices
+ self._upper = (((shamax + 1) * (slice + 1)) / numslices) - 1
if recover:
self.recover_backup_files()
- def whichq(self):
- return self.__whichq
+ @property
+ def queue_directory(self):
+ return self._whichq
- def enqueue(self, _msg, _metadata={}, **_kws):
+ def enqueue(self, _msg, _metadata=None, **_kws):
+ if _metadata is None:
+ _metadata = {}
# Calculate the SHA hexdigest of the message to get a unique base
# filename. We're also going to use the digest as a hash into the set
# of parallel qrunner processes.
data = _metadata.copy()
data.update(_kws)
listname = data.get('listname', '--nolist--')
- # Get some data for the input to the sha hash
+ # Get some data for the input to the sha hash.
now = time.time()
- if SAVE_MSGS_AS_PICKLES and not data.get('_plaintext'):
+ if not data.get('_plaintext'):
protocol = 1
msgsave = cPickle.dumps(_msg, protocol)
else:
@@ -96,61 +93,54 @@ class Switchboard:
msgsave = cPickle.dumps(str(_msg), protocol)
# listname is unicode but the input to the hash function must be an
# 8-bit string (eventually, a bytes object).
- hashfood = msgsave + listname.encode('utf-8') + `now`
- # Encode the current time into the file name for FIFO sorting in
- # files(). The file name consists of two parts separated by a `+':
- # the received time for this message (i.e. when it first showed up on
- # this system) and the sha hex digest.
- #rcvtime = data.setdefault('received_time', now)
+ hashfood = msgsave + listname.encode('utf-8') + repr(now)
+ # Encode the current time into the file name for FIFO sorting. The
+ # file name consists of two parts separated by a '+': the received
+ # time for this message (i.e. when it first showed up on this system)
+ # and the sha hex digest.
rcvtime = data.setdefault('received_time', now)
- filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest()
- filename = os.path.join(self.__whichq, filebase + '.pck')
+ filebase = repr(rcvtime) + '+' + sha.new(hashfood).hexdigest()
+ filename = os.path.join(self._whichq, filebase + '.pck')
tmpfile = filename + '.tmp'
# Always add the metadata schema version number
data['version'] = config.QFILE_SCHEMA_VERSION
# Filter out volatile entries
- for k in data.keys():
+ for k in data:
if k.startswith('_'):
del data[k]
# We have to tell the dequeue() method whether to parse the message
# object or not.
data['_parsemsg'] = (protocol == 0)
# Write to the pickle file the message object and metadata.
- fp = open(tmpfile, 'w')
- try:
+ with open(tmpfile, 'w') as fp:
fp.write(msgsave)
cPickle.dump(data, fp, protocol)
fp.flush()
os.fsync(fp.fileno())
- finally:
- fp.close()
os.rename(tmpfile, filename)
return filebase
def dequeue(self, filebase):
# Calculate the filename from the given filebase.
- filename = os.path.join(self.__whichq, filebase + '.pck')
- backfile = os.path.join(self.__whichq, filebase + '.bak')
+ filename = os.path.join(self._whichq, filebase + '.pck')
+ backfile = os.path.join(self._whichq, filebase + '.bak')
# Read the message object and metadata.
- fp = open(filename)
- # Move the file to the backup file name for processing. If this
- # process crashes uncleanly the .bak file will be used to re-instate
- # the .pck file in order to try again. XXX what if something caused
- # Python to constantly crash? Is it possible that we'd end up mail
- # bombing recipients or crushing the archiver? How would we defend
- # against that?
- os.rename(filename, backfile)
- try:
+ with open(filename) as fp:
+ # Move the file to the backup file name for processing. If this
+ # process crashes uncleanly the .bak file will be used to
+ # re-instate the .pck file in order to try again. XXX what if
+ # something caused Python to constantly crash? Is it possible
+ # that we'd end up mail bombing recipients or crushing the
+ # archiver? How would we defend against that?
+ os.rename(filename, backfile)
msg = cPickle.load(fp)
data = cPickle.load(fp)
- finally:
- fp.close()
if data.get('_parsemsg'):
msg = email.message_from_string(msg, Message.Message)
return msg, data
def finish(self, filebase, preserve=False):
- bakfile = os.path.join(self.__whichq, filebase + '.bak')
+ bakfile = os.path.join(self._whichq, filebase + '.bak')
try:
if preserve:
psvfile = os.path.join(config.SHUNTQUEUE_DIR,
@@ -164,17 +154,21 @@ class Switchboard:
elog.exception('Failed to unlink/preserve backup file: %s',
bakfile)
- def files(self, extension='.pck'):
+ @property
+ def files(self):
+ return self.get_files()
+
+ def get_files(self, extension='.pck'):
times = {}
- lower = self.__lower
- upper = self.__upper
- for f in os.listdir(self.__whichq):
+ lower = self._lower
+ upper = self._upper
+ for f in os.listdir(self._whichq):
# By ignoring anything that doesn't end in .pck, we ignore
# tempfiles and avoid a race condition.
filebase, ext = os.path.splitext(f)
if ext <> extension:
continue
- when, digest = filebase.split('+')
+ when, digest = filebase.split('+', 1)
# Throw out any files which don't match our bitrange. BAW: test
# performance and end-cases of this algorithm. MAS: both
# comparisons need to be <= to get complete range.
@@ -184,15 +178,14 @@ class Switchboard:
key += DELTA
times[key] = filebase
# FIFO sort
- keys = times.keys()
- keys.sort()
- return [times[k] for k in keys]
+ for key in sorted(times):
+ yield times[key]
def recover_backup_files(self):
# Move all .bak files in our slice to .pck. It's impossible for both
# to exist at the same time, so the move is enough to ensure that our
# normal dequeuing process will handle them.
- for filebase in self.files('.bak'):
- src = os.path.join(self.__whichq, filebase + '.bak')
- dst = os.path.join(self.__whichq, filebase + '.pck')
+ for filebase in self.get_files('.bak'):
+ src = os.path.join(self._whichq, filebase + '.bak')
+ dst = os.path.join(self._whichq, filebase + '.pck')
os.rename(src, dst)