From 02c62280359458b86c2680231774fa00c76072fc Mon Sep 17 00:00:00 2001 From: bwarsaw Date: Wed, 20 Dec 2000 21:20:20 +0000 Subject: Enqueue(): Allow for queuing to alternative queue directories. If newdata or kws has an item called "_whichq", this specifies the directory to queue to, defaulting to INQUEUE_DIR. The metadata "filebase" is always set to __filebase, not just when the metadata is created for the first time. When writing the .db file (i.e. containing the metadata), always write it after the .msg text file, and always write it to a temporary file, with an os.rename() shuffle. This should avoid race conditions when queuing a message to another queue because the queue runners only key off of files ending in ".db". Requeue(): New method which calls Enqueue() after zapping self.__filebase so it'll be recalculated. --- Mailman/Message.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) (limited to 'Mailman/Message.py') diff --git a/Mailman/Message.py b/Mailman/Message.py index 496144fa2..44328aee7 100644 --- a/Mailman/Message.py +++ b/Mailman/Message.py @@ -146,8 +146,11 @@ class Message(rfc822.Message): if self.__filebase is None: hashfood = text + mlist.internal_name() + `time.time()` self.__filebase = sha.new(hashfood).hexdigest() - msgfile = os.path.join(mm_cfg.QUEUE_DIR, self.__filebase + '.msg') - dbfile = os.path.join(mm_cfg.QUEUE_DIR, self.__filebase + '.db') + # calculate metadata from parameter list + newdata.update(kws) + whichq = newdata.get('_whichq', mm_cfg.INQUEUE_DIR) + msgfile = os.path.join(whichq, self.__filebase + '.msg') + dbfile = os.path.join(whichq, self.__filebase + '.db') # Initialize the information about this message delivery. It's # possible a delivery attempt has been previously tried on this # message, in which case, we'll just update the data. @@ -159,12 +162,11 @@ class Message(rfc822.Message): except (EOFError, ValueError, TypeError, IOError): msgdata = {'listname' : mlist.internal_name(), 'version' : mm_cfg.QFILE_SCHEMA_VERSION, - 'filebase' : self.__filebase, } existsp = 0 # Merge in the additional msgdata msgdata.update(newdata) - msgdata.update(kws) + msgdata['filebase'] = self.__filebase # Get rid of volatile entries, which have the convention of starting # with an underscore (TBD: should we use v_ as a naming convention?). # Need the _dirty flag for later though. @@ -172,16 +174,26 @@ class Message(rfc822.Message): for k in msgdata.keys(): if k[0] == '_': del msgdata[k] - # Write the data file - dbfp = Utils.open_ex(dbfile, 'w') - marshal.dump(msgdata, dbfp) - dbfp.close() # If it doesn't already exist, or if the text of the message has # changed, write the message file to disk. if not existsp or dirty: msgfp = Utils.open_ex(msgfile, 'w') msgfp.write(text) msgfp.close() + # Write the data file, first to a tmp file and then move it to the .db + # file. Do this to avoid race conditions when re-queuing to another + # message queue. We only need to do the .db files because Runner only + # looks at .db files. + tmpfile = dbfile + '.tmp' + dbfp = Utils.open_ex(tmpfile, 'w') + marshal.dump(msgdata, dbfp) + dbfp.close() + os.rename(tmpfile, dbfile) + + def Requeue(self, mlist, newdata={}, **kws): + """Like Enqueing, but force a new filebase calculation.""" + self.__filebase = None + self.Enqueue(mlist, newdata=newdata, **kws) -- cgit v1.2.3-70-g09d2