summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbwarsaw2001-05-14 18:16:30 +0000
committerbwarsaw2001-05-14 18:16:30 +0000
commitf04bb42e60fb9800b99c0b7bb36f198636d7b3ea (patch)
tree463f7d149701a51063cdabc88e7c4eecb87089d1
parentb0203e6fa189fa98523802031ca2e35dc98d73ed (diff)
downloadmailman-f04bb42e60fb9800b99c0b7bb36f198636d7b3ea.tar.gz
mailman-f04bb42e60fb9800b99c0b7bb36f198636d7b3ea.tar.zst
mailman-f04bb42e60fb9800b99c0b7bb36f198636d7b3ea.zip
-rw-r--r--Mailman/Queue/Switchboard.py93
1 files changed, 71 insertions, 22 deletions
diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py
index 6f8023fb4..ea97026a8 100644
--- a/Mailman/Queue/Switchboard.py
+++ b/Mailman/Queue/Switchboard.py
@@ -37,7 +37,7 @@ import os
import time
import sha
import marshal
-from errno import EEXIST
+import errno
from mimelib.Parser import Parser
@@ -61,7 +61,7 @@ class _Switchboard:
try:
os.mkdir(self.__whichq, 0770)
except OSError, e:
- if e.errno <> EEXIST: raise
+ if e.errno <> errno.EEXIST: raise
finally:
os.umask(omask)
# Fast track for no slices
@@ -74,14 +74,22 @@ class _Switchboard:
def enqueue(self, _msg, _metadata={}, **_kws):
# Calculate the SHA hexdigest of the message to get a unique base
- # filename.
+ # filename. We're also going to use the digest as a hash into the set
+ # of parallel qrunner processes.
data = _metadata.copy()
data.update(_kws)
listname = data.get('listname', '--nolist--')
- now = `time.time()`
+ # Get some data for the input to the sha hash
+ now = time.time()
msgtext = str(_msg)
- hashfood = msgtext + listname + now
- filebase = sha.new(hashfood).hexdigest()
+ hashfood = msgtext + listname + `now`
+ # Encode the current time into the file name for FIFO sorting in
+ # files(). The file name consists of two parts separated by a `+':
+ # the received time for this message (i.e. when it first showed up on
+ # this system) and the sha hex digest.
+ #rcvtime = data.setdefault('received_time', now)
+ rcvtime = data.setdefault('received_time', now)
+ filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest()
# Figure out which queue files the message is to be written to.
msgfile = os.path.join(self.__whichq, filebase + '.msg')
dbfile = os.path.join(self.__whichq, filebase + '.db')
@@ -115,25 +123,45 @@ class _Switchboard:
dbfile = os.path.join(self.__whichq, filebase + '.db')
# Read the message text and parse it into a message object tree. When
# done, unlink the msg file.
- msgfp = open(msgfile)
- p = Parser(_class=Message.Message)
- msg = p.parse(msgfp)
- msgfp.close()
- os.unlink(msgfile)
+ msg = data = None
+ try:
+ msgfp = open(msgfile)
+ except IOError, e:
+ if e.errno <> errno.ENOENT: raise
+ else:
+ p = Parser(_class=Message.Message)
+ msg = p.parse(msgfp)
+ msgfp.close()
+ os.unlink(msgfile)
# Now, read the metadata using the appropriate external metadata
# format. When done, unlink the metadata file.
- data = self._ext_read(dbfile)
- os.unlink(dbfile)
+ try:
+ data = self._ext_read(dbfile)
+ except (IOError, OSError), e:
+ if e.errno <> errno.ENOENT: raise
+ else:
+ os.unlink(dbfile)
return msg, data
def files(self):
- all = [os.path.splitext(f)[0] for f in os.listdir(self.__whichq)
- if f.endswith('.db')]
- # Fast track exit
- if self.__lower is None:
- return all
- # BAW: test performance and end-cases of this algorithm
- return [f for f in all if self.__lower <= long(f, 16) < self.__upper]
+ times = {}
+ lower = self.__lower
+ upper = self.__upper
+ for f in os.listdir(self.__whichq):
+ # We only care about the file's base name (i.e. no extension).
+ # Thus we'll ignore anything that doesn't end in .db.
+ if not f.endswith('.db'):
+ continue
+ filebase = os.path.splitext(f)[0]
+ when, digest = filebase.split('+')
+ # Throw out any files which don't match our bitrange. BAW: test
+ # performance and end-cases of this algorithm.
+ if not lower or (lower <= long(digest, 16) < upper):
+ times[float(when)] = filebase
+ # FIFO sort
+ keys = times.keys()
+ keys.sort()
+ return [times[k] for k in keys]
def _ext_write(self, tmpfile, data):
raise UnimplementedError
@@ -145,20 +173,41 @@ class _Switchboard:
class MarshalSwitchboard(_Switchboard):
"""Python marshal format."""
+ FLOAT_ATTRIBUTES = ['received_time']
+
def _ext_write(self, filename, dict):
omask = os.umask(007) # -rw-rw----
try:
fp = open(filename, 'w')
finally:
os.umask(omask)
+ # Python's marshal, up to and including in Python 2.1, has a bug where
+ # the full precision of floats was not stored. We work around this
+ # bug by hardcoding a list of float values we know about, repr()-izing
+ # them ourselves, and doing the reverse conversion on _ext_read().
+ for attr in self.FLOAT_ATTRIBUTES:
+ # We use try/except because we expect a hitrate of nearly 100%
+ try:
+ fval = dict[attr]
+ except KeyError:
+ pass
+ else:
+ dict[attr] = repr(fval)
marshal.dump(dict, fp)
fp.close()
def _ext_read(self, filename):
fp = open(filename)
- data = marshal.load(fp)
+ dict = marshal.load(fp)
+ # Do the reverse conversion (repr -> float)
+ for attr in self.FLOAT_ATTRIBUTES:
+ try:
+ sval = dict[attr]
+ except KeyError:
+ pass
+ dict[attr] = eval(sval, {'__builtins__': {}})
fp.close()
- return data
+ return dict