summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Mailman/OutgoingQueue.py119
-rw-r--r--Mailman/Utils.py28
-rwxr-xr-xmail/contact_transport2
3 files changed, 127 insertions, 22 deletions
diff --git a/Mailman/OutgoingQueue.py b/Mailman/OutgoingQueue.py
index 3ab43b989..9a5bce70a 100644
--- a/Mailman/OutgoingQueue.py
+++ b/Mailman/OutgoingQueue.py
@@ -15,34 +15,133 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""Queue up posts if the SMTP connection fails."""
+#
+# messages are queued before a delivery attempt takes place
+# this ensures that should the system fail for some reason,
+# the run_queue process will make delivery attempts later.
+#
+# so q files have 2 possible states - those that are queued because
+# they are currently being delivered by the initial delivery attempt
+# and those that are queued because the first delivery attempt has
+# failed. the former state is indicated by the fact that the filename
+# is setuid and has data written to it. all queue entries in the latter
+# state are not setuid and have data in them.
+
+#
+# protection from multiple processQueue procedures occuring
+# simultaneously is enforced by setting a lock file forcing
+# only one such process to happen at a time.
+#
-import tempfile, marshal, mm_cfg
+import os, stat, tempfile, marshal, mm_cfg
TEMPLATE = "mm_q."
+#
+# multiple prcesses with different uids can write and/or
+# defer q entries. with the queue directory setgid to mailman
+# and writable by group mailman, having the QF_MODE set to 0660
+# should enable any process with gid mailman to read, write, rename,
+# or unlink the file
+#
+QF_MODE = 0660
+
+#
+# how long can a q entry possibly be in the
+# active state?
+#
+MAX_ACTIVE = 7200 # 2 hours
-def dequeueMessage(msg):
- import os
- os.unlink(msg)
+#
+# 1) get global lock so only of these
+# procedures can run at a time
+# 2) find all the files that are deferred queue
+# entries and all the files that have been in
+# an active state for too long and attempt a delivery
+#
def processQueue():
- import os, smtplib
+ import flock, time, smtplib, Utils
+ lock_file = flock.FileLock(os.path.join(mm_cfg.LOCK_DIR, "mmqueue_run.lock"))
+ lock_file.lock()
files = os.listdir(mm_cfg.DATA_DIR)
for file in files:
- if TEMPLATE <> file[:len(TEMPLATE)]:
+ #
+ # does it look like a q entry?
+ #
+ if TEMPLATE != file[:len(TEMPLATE)]:
continue
full_fname = os.path.join(mm_cfg.DATA_DIR, file)
+ st = os.stat(full_fname)
+ #
+ # if the file is not a deferred q message, we check to
+ # see if the modification time was too long ago and process
+ # it anyway. If the modification time was recent, leave it
+ # alone as it's probably being delivered by another process anyway
+ #
+ if not isDeferred(full_fname) and st[stat.ST_CTIME] > (time.time() - MAX_ACTIVE):
+ continue
f = open(full_fname,"r")
recip,sender,text = marshal.load(f)
f.close()
- import Utils
Utils.TrySMTPDelivery(recip,sender,text,full_fname)
-
-
+ lock_file.unlock()
+
+
+#
+# this function is used by any process that
+# attempts to deliver a message for the first time
+# so the entry is set with the sticky bit.
+#
def enqueueMessage(the_sender, recip, text):
tempfile.tempdir = mm_cfg.DATA_DIR
tempfile.template = TEMPLATE
- fname = tempfile.mktemp()
+ fname = tempfile.mktemp()
f = open(fname, "a+")
+ os.chmod(fname, QF_MODE | stat.S_ISUID) # make sure this is set right off the bat
marshal.dump((recip,the_sender,text),f)
f.close()
+ os.chmod(fname, QF_MODE | stat.S_ISUID)
return fname
+
+
+
+#
+# is this queue entry a deferred one?
+#
+def isDeferred(q_entry):
+ st = os.stat(q_entry)
+ size = st[stat.ST_SIZE]
+ mode = st[stat.ST_MODE]
+ if mode & stat.S_ISUID:
+ return 0
+ elif not size: # the file was just opened, but not chmod'd
+ return 0
+ else:
+ return 1
+
+
+#
+# given the full path to a q_entry, set the
+# status to deferred if it is not
+# already in that state. this function must work
+# on entries already in a deferred state.
+#
+def deferMessage(q_entry):
+ if not isDeferred(q_entry):
+ os.chmod(q_entry, QF_MODE)
+
+
+#
+# given the full path to a q_entry
+# remove it from the queue - the queue
+# entry may or may not end in .deferred
+#
+def dequeueMessage(q_entry):
+ os.unlink(q_entry)
+
+
+
+
+
+
+
diff --git a/Mailman/Utils.py b/Mailman/Utils.py
index 44127df18..cd4e6d961 100644
--- a/Mailman/Utils.py
+++ b/Mailman/Utils.py
@@ -171,7 +171,6 @@ def DeliverToUser(msg, recipient, add_headers=[]):
if os.fork():
return
sender = msg.GetSender()
-
try:
try:
msg.headers.remove('\n')
@@ -188,8 +187,6 @@ def DeliverToUser(msg, recipient, add_headers=[]):
import OutgoingQueue
queue_id = OutgoingQueue.enqueueMessage(sender, recipient, text)
TrySMTPDelivery(recipient,sender,text,queue_id)
- # Just in case there's still something waiting to be sent...
- OutgoingQueue.processQueue()
finally:
os._exit(0)
@@ -203,26 +200,28 @@ def TrySMTPDelivery(recipient, sender, text, queue_entry):
con.helo(mm_cfg.DEFAULT_HOST_NAME)
con.send(to=recipient,frm=sender,text=text)
con.quit()
- dequeue = 1
+ defer = 0
failure = None
+ #
# Any exceptions that warrant leaving the message on the queue should
- # be identified by their exception, below, with setting 'dequeue' to 1
+ # be identified by their exception, below, with setting 'defer' to 1
# and 'failure' to something suitable. Without a particular exception
# we fall through to the blanket 'except:', which dequeues the message.
-
+ #
except socket.error:
# MTA not responding, or other socket prob - leave on queue.
- dequeue = 0
+ defer = 1
failure = sys.exc_info()
-
except:
# Unanticipated cause of delivery failure - *don't* leave message
# queued, or it may stay, with reattempted delivery, forever...
- dequeue = 1
+ defer = 0
failure = sys.exc_info()
- if dequeue:
+ if defer:
+ OutgoingQueue.deferMessage(queue_entry)
+ else:
OutgoingQueue.dequeueMessage(queue_entry)
if failure:
# XXX Here may be the place to get the failure info back to the
@@ -232,7 +231,8 @@ def TrySMTPDelivery(recipient, sender, text, queue_entry):
l.write("To %s:\n" % recipient)
l.write("\t %s / %s\n" % (failure[0], failure[1]))
l.flush()
-
+
+
def QuotePeriods(text):
return string.join(string.split(text, '\n.\n'), '\n .\n')
@@ -457,3 +457,9 @@ def maketext(templatefile, dict, raw=0):
if raw:
return template % dict
return wrap(template % dict)
+
+
+
+
+
+
diff --git a/mail/contact_transport b/mail/contact_transport
index d76c78c61..81213fea0 100755
--- a/mail/contact_transport
+++ b/mail/contact_transport
@@ -57,5 +57,5 @@ except IOError:
sys.stderr.flush()
raise exc, exc_msg, exc_tb
Utils.TrySMTPDelivery(to_addrs, from_addr, text, queue_id)
-OutgoingQueue.processQueue()
+