diff options
| author | bwarsaw | 2001-05-14 18:16:30 +0000 |
|---|---|---|
| committer | bwarsaw | 2001-05-14 18:16:30 +0000 |
| commit | f04bb42e60fb9800b99c0b7bb36f198636d7b3ea (patch) | |
| tree | 463f7d149701a51063cdabc88e7c4eecb87089d1 | |
| parent | b0203e6fa189fa98523802031ca2e35dc98d73ed (diff) | |
| download | mailman-f04bb42e60fb9800b99c0b7bb36f198636d7b3ea.tar.gz mailman-f04bb42e60fb9800b99c0b7bb36f198636d7b3ea.tar.zst mailman-f04bb42e60fb9800b99c0b7bb36f198636d7b3ea.zip | |
| -rw-r--r-- | Mailman/Queue/Switchboard.py | 93 |
1 files changed, 71 insertions, 22 deletions
diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py index 6f8023fb4..ea97026a8 100644 --- a/Mailman/Queue/Switchboard.py +++ b/Mailman/Queue/Switchboard.py @@ -37,7 +37,7 @@ import os import time import sha import marshal -from errno import EEXIST +import errno from mimelib.Parser import Parser @@ -61,7 +61,7 @@ class _Switchboard: try: os.mkdir(self.__whichq, 0770) except OSError, e: - if e.errno <> EEXIST: raise + if e.errno <> errno.EEXIST: raise finally: os.umask(omask) # Fast track for no slices @@ -74,14 +74,22 @@ class _Switchboard: def enqueue(self, _msg, _metadata={}, **_kws): # Calculate the SHA hexdigest of the message to get a unique base - # filename. + # filename. We're also going to use the digest as a hash into the set + # of parallel qrunner processes. data = _metadata.copy() data.update(_kws) listname = data.get('listname', '--nolist--') - now = `time.time()` + # Get some data for the input to the sha hash + now = time.time() msgtext = str(_msg) - hashfood = msgtext + listname + now - filebase = sha.new(hashfood).hexdigest() + hashfood = msgtext + listname + `now` + # Encode the current time into the file name for FIFO sorting in + # files(). The file name consists of two parts separated by a `+': + # the received time for this message (i.e. when it first showed up on + # this system) and the sha hex digest. + #rcvtime = data.setdefault('received_time', now) + rcvtime = data.setdefault('received_time', now) + filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest() # Figure out which queue files the message is to be written to. msgfile = os.path.join(self.__whichq, filebase + '.msg') dbfile = os.path.join(self.__whichq, filebase + '.db') @@ -115,25 +123,45 @@ class _Switchboard: dbfile = os.path.join(self.__whichq, filebase + '.db') # Read the message text and parse it into a message object tree. When # done, unlink the msg file. - msgfp = open(msgfile) - p = Parser(_class=Message.Message) - msg = p.parse(msgfp) - msgfp.close() - os.unlink(msgfile) + msg = data = None + try: + msgfp = open(msgfile) + except IOError, e: + if e.errno <> errno.ENOENT: raise + else: + p = Parser(_class=Message.Message) + msg = p.parse(msgfp) + msgfp.close() + os.unlink(msgfile) # Now, read the metadata using the appropriate external metadata # format. When done, unlink the metadata file. - data = self._ext_read(dbfile) - os.unlink(dbfile) + try: + data = self._ext_read(dbfile) + except (IOError, OSError), e: + if e.errno <> errno.ENOENT: raise + else: + os.unlink(dbfile) return msg, data def files(self): - all = [os.path.splitext(f)[0] for f in os.listdir(self.__whichq) - if f.endswith('.db')] - # Fast track exit - if self.__lower is None: - return all - # BAW: test performance and end-cases of this algorithm - return [f for f in all if self.__lower <= long(f, 16) < self.__upper] + times = {} + lower = self.__lower + upper = self.__upper + for f in os.listdir(self.__whichq): + # We only care about the file's base name (i.e. no extension). + # Thus we'll ignore anything that doesn't end in .db. + if not f.endswith('.db'): + continue + filebase = os.path.splitext(f)[0] + when, digest = filebase.split('+') + # Throw out any files which don't match our bitrange. BAW: test + # performance and end-cases of this algorithm. + if not lower or (lower <= long(digest, 16) < upper): + times[float(when)] = filebase + # FIFO sort + keys = times.keys() + keys.sort() + return [times[k] for k in keys] def _ext_write(self, tmpfile, data): raise UnimplementedError @@ -145,20 +173,41 @@ class _Switchboard: class MarshalSwitchboard(_Switchboard): """Python marshal format.""" + FLOAT_ATTRIBUTES = ['received_time'] + def _ext_write(self, filename, dict): omask = os.umask(007) # -rw-rw---- try: fp = open(filename, 'w') finally: os.umask(omask) + # Python's marshal, up to and including in Python 2.1, has a bug where + # the full precision of floats was not stored. We work around this + # bug by hardcoding a list of float values we know about, repr()-izing + # them ourselves, and doing the reverse conversion on _ext_read(). + for attr in self.FLOAT_ATTRIBUTES: + # We use try/except because we expect a hitrate of nearly 100% + try: + fval = dict[attr] + except KeyError: + pass + else: + dict[attr] = repr(fval) marshal.dump(dict, fp) fp.close() def _ext_read(self, filename): fp = open(filename) - data = marshal.load(fp) + dict = marshal.load(fp) + # Do the reverse conversion (repr -> float) + for attr in self.FLOAT_ATTRIBUTES: + try: + sval = dict[attr] + except KeyError: + pass + dict[attr] = eval(sval, {'__builtins__': {}}) fp.close() - return data + return dict |
