diff options
| -rw-r--r-- | Mailman/OutgoingQueue.py | 119 | ||||
| -rw-r--r-- | Mailman/Utils.py | 28 | ||||
| -rwxr-xr-x | mail/contact_transport | 2 |
3 files changed, 127 insertions, 22 deletions
diff --git a/Mailman/OutgoingQueue.py b/Mailman/OutgoingQueue.py index 3ab43b989..9a5bce70a 100644 --- a/Mailman/OutgoingQueue.py +++ b/Mailman/OutgoingQueue.py @@ -15,34 +15,133 @@ # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """Queue up posts if the SMTP connection fails.""" +# +# messages are queued before a delivery attempt takes place +# this ensures that should the system fail for some reason, +# the run_queue process will make delivery attempts later. +# +# so q files have 2 possible states - those that are queued because +# they are currently being delivered by the initial delivery attempt +# and those that are queued because the first delivery attempt has +# failed. the former state is indicated by the fact that the filename +# is setuid and has data written to it. all queue entries in the latter +# state are not setuid and have data in them. + +# +# protection from multiple processQueue procedures occuring +# simultaneously is enforced by setting a lock file forcing +# only one such process to happen at a time. +# -import tempfile, marshal, mm_cfg +import os, stat, tempfile, marshal, mm_cfg TEMPLATE = "mm_q." +# +# multiple prcesses with different uids can write and/or +# defer q entries. with the queue directory setgid to mailman +# and writable by group mailman, having the QF_MODE set to 0660 +# should enable any process with gid mailman to read, write, rename, +# or unlink the file +# +QF_MODE = 0660 + +# +# how long can a q entry possibly be in the +# active state? +# +MAX_ACTIVE = 7200 # 2 hours -def dequeueMessage(msg): - import os - os.unlink(msg) +# +# 1) get global lock so only of these +# procedures can run at a time +# 2) find all the files that are deferred queue +# entries and all the files that have been in +# an active state for too long and attempt a delivery +# def processQueue(): - import os, smtplib + import flock, time, smtplib, Utils + lock_file = flock.FileLock(os.path.join(mm_cfg.LOCK_DIR, "mmqueue_run.lock")) + lock_file.lock() files = os.listdir(mm_cfg.DATA_DIR) for file in files: - if TEMPLATE <> file[:len(TEMPLATE)]: + # + # does it look like a q entry? + # + if TEMPLATE != file[:len(TEMPLATE)]: continue full_fname = os.path.join(mm_cfg.DATA_DIR, file) + st = os.stat(full_fname) + # + # if the file is not a deferred q message, we check to + # see if the modification time was too long ago and process + # it anyway. If the modification time was recent, leave it + # alone as it's probably being delivered by another process anyway + # + if not isDeferred(full_fname) and st[stat.ST_CTIME] > (time.time() - MAX_ACTIVE): + continue f = open(full_fname,"r") recip,sender,text = marshal.load(f) f.close() - import Utils Utils.TrySMTPDelivery(recip,sender,text,full_fname) - - + lock_file.unlock() + + +# +# this function is used by any process that +# attempts to deliver a message for the first time +# so the entry is set with the sticky bit. +# def enqueueMessage(the_sender, recip, text): tempfile.tempdir = mm_cfg.DATA_DIR tempfile.template = TEMPLATE - fname = tempfile.mktemp() + fname = tempfile.mktemp() f = open(fname, "a+") + os.chmod(fname, QF_MODE | stat.S_ISUID) # make sure this is set right off the bat marshal.dump((recip,the_sender,text),f) f.close() + os.chmod(fname, QF_MODE | stat.S_ISUID) return fname + + + +# +# is this queue entry a deferred one? +# +def isDeferred(q_entry): + st = os.stat(q_entry) + size = st[stat.ST_SIZE] + mode = st[stat.ST_MODE] + if mode & stat.S_ISUID: + return 0 + elif not size: # the file was just opened, but not chmod'd + return 0 + else: + return 1 + + +# +# given the full path to a q_entry, set the +# status to deferred if it is not +# already in that state. this function must work +# on entries already in a deferred state. +# +def deferMessage(q_entry): + if not isDeferred(q_entry): + os.chmod(q_entry, QF_MODE) + + +# +# given the full path to a q_entry +# remove it from the queue - the queue +# entry may or may not end in .deferred +# +def dequeueMessage(q_entry): + os.unlink(q_entry) + + + + + + + diff --git a/Mailman/Utils.py b/Mailman/Utils.py index 44127df18..cd4e6d961 100644 --- a/Mailman/Utils.py +++ b/Mailman/Utils.py @@ -171,7 +171,6 @@ def DeliverToUser(msg, recipient, add_headers=[]): if os.fork(): return sender = msg.GetSender() - try: try: msg.headers.remove('\n') @@ -188,8 +187,6 @@ def DeliverToUser(msg, recipient, add_headers=[]): import OutgoingQueue queue_id = OutgoingQueue.enqueueMessage(sender, recipient, text) TrySMTPDelivery(recipient,sender,text,queue_id) - # Just in case there's still something waiting to be sent... - OutgoingQueue.processQueue() finally: os._exit(0) @@ -203,26 +200,28 @@ def TrySMTPDelivery(recipient, sender, text, queue_entry): con.helo(mm_cfg.DEFAULT_HOST_NAME) con.send(to=recipient,frm=sender,text=text) con.quit() - dequeue = 1 + defer = 0 failure = None + # # Any exceptions that warrant leaving the message on the queue should - # be identified by their exception, below, with setting 'dequeue' to 1 + # be identified by their exception, below, with setting 'defer' to 1 # and 'failure' to something suitable. Without a particular exception # we fall through to the blanket 'except:', which dequeues the message. - + # except socket.error: # MTA not responding, or other socket prob - leave on queue. - dequeue = 0 + defer = 1 failure = sys.exc_info() - except: # Unanticipated cause of delivery failure - *don't* leave message # queued, or it may stay, with reattempted delivery, forever... - dequeue = 1 + defer = 0 failure = sys.exc_info() - if dequeue: + if defer: + OutgoingQueue.deferMessage(queue_entry) + else: OutgoingQueue.dequeueMessage(queue_entry) if failure: # XXX Here may be the place to get the failure info back to the @@ -232,7 +231,8 @@ def TrySMTPDelivery(recipient, sender, text, queue_entry): l.write("To %s:\n" % recipient) l.write("\t %s / %s\n" % (failure[0], failure[1])) l.flush() - + + def QuotePeriods(text): return string.join(string.split(text, '\n.\n'), '\n .\n') @@ -457,3 +457,9 @@ def maketext(templatefile, dict, raw=0): if raw: return template % dict return wrap(template % dict) + + + + + + diff --git a/mail/contact_transport b/mail/contact_transport index d76c78c61..81213fea0 100755 --- a/mail/contact_transport +++ b/mail/contact_transport @@ -57,5 +57,5 @@ except IOError: sys.stderr.flush() raise exc, exc_msg, exc_tb Utils.TrySMTPDelivery(to_addrs, from_addr, text, queue_id) -OutgoingQueue.processQueue() + |
