summaryrefslogtreecommitdiff
path: root/Mailman/OutgoingQueue.py
diff options
context:
space:
mode:
Diffstat (limited to 'Mailman/OutgoingQueue.py')
-rw-r--r--Mailman/OutgoingQueue.py119
1 files changed, 109 insertions, 10 deletions
diff --git a/Mailman/OutgoingQueue.py b/Mailman/OutgoingQueue.py
index 3ab43b989..9a5bce70a 100644
--- a/Mailman/OutgoingQueue.py
+++ b/Mailman/OutgoingQueue.py
@@ -15,34 +15,133 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""Queue up posts if the SMTP connection fails."""
+#
+# messages are queued before a delivery attempt takes place
+# this ensures that should the system fail for some reason,
+# the run_queue process will make delivery attempts later.
+#
+# so q files have 2 possible states - those that are queued because
+# they are currently being delivered by the initial delivery attempt
+# and those that are queued because the first delivery attempt has
+# failed. the former state is indicated by the fact that the filename
+# is setuid and has data written to it. all queue entries in the latter
+# state are not setuid and have data in them.
+
+#
+# protection from multiple processQueue procedures occuring
+# simultaneously is enforced by setting a lock file forcing
+# only one such process to happen at a time.
+#
-import tempfile, marshal, mm_cfg
+import os, stat, tempfile, marshal, mm_cfg
TEMPLATE = "mm_q."
+#
+# multiple prcesses with different uids can write and/or
+# defer q entries. with the queue directory setgid to mailman
+# and writable by group mailman, having the QF_MODE set to 0660
+# should enable any process with gid mailman to read, write, rename,
+# or unlink the file
+#
+QF_MODE = 0660
+
+#
+# how long can a q entry possibly be in the
+# active state?
+#
+MAX_ACTIVE = 7200 # 2 hours
-def dequeueMessage(msg):
- import os
- os.unlink(msg)
+#
+# 1) get global lock so only of these
+# procedures can run at a time
+# 2) find all the files that are deferred queue
+# entries and all the files that have been in
+# an active state for too long and attempt a delivery
+#
def processQueue():
- import os, smtplib
+ import flock, time, smtplib, Utils
+ lock_file = flock.FileLock(os.path.join(mm_cfg.LOCK_DIR, "mmqueue_run.lock"))
+ lock_file.lock()
files = os.listdir(mm_cfg.DATA_DIR)
for file in files:
- if TEMPLATE <> file[:len(TEMPLATE)]:
+ #
+ # does it look like a q entry?
+ #
+ if TEMPLATE != file[:len(TEMPLATE)]:
continue
full_fname = os.path.join(mm_cfg.DATA_DIR, file)
+ st = os.stat(full_fname)
+ #
+ # if the file is not a deferred q message, we check to
+ # see if the modification time was too long ago and process
+ # it anyway. If the modification time was recent, leave it
+ # alone as it's probably being delivered by another process anyway
+ #
+ if not isDeferred(full_fname) and st[stat.ST_CTIME] > (time.time() - MAX_ACTIVE):
+ continue
f = open(full_fname,"r")
recip,sender,text = marshal.load(f)
f.close()
- import Utils
Utils.TrySMTPDelivery(recip,sender,text,full_fname)
-
-
+ lock_file.unlock()
+
+
+#
+# this function is used by any process that
+# attempts to deliver a message for the first time
+# so the entry is set with the sticky bit.
+#
def enqueueMessage(the_sender, recip, text):
tempfile.tempdir = mm_cfg.DATA_DIR
tempfile.template = TEMPLATE
- fname = tempfile.mktemp()
+ fname = tempfile.mktemp()
f = open(fname, "a+")
+ os.chmod(fname, QF_MODE | stat.S_ISUID) # make sure this is set right off the bat
marshal.dump((recip,the_sender,text),f)
f.close()
+ os.chmod(fname, QF_MODE | stat.S_ISUID)
return fname
+
+
+
+#
+# is this queue entry a deferred one?
+#
+def isDeferred(q_entry):
+ st = os.stat(q_entry)
+ size = st[stat.ST_SIZE]
+ mode = st[stat.ST_MODE]
+ if mode & stat.S_ISUID:
+ return 0
+ elif not size: # the file was just opened, but not chmod'd
+ return 0
+ else:
+ return 1
+
+
+#
+# given the full path to a q_entry, set the
+# status to deferred if it is not
+# already in that state. this function must work
+# on entries already in a deferred state.
+#
+def deferMessage(q_entry):
+ if not isDeferred(q_entry):
+ os.chmod(q_entry, QF_MODE)
+
+
+#
+# given the full path to a q_entry
+# remove it from the queue - the queue
+# entry may or may not end in .deferred
+#
+def dequeueMessage(q_entry):
+ os.unlink(q_entry)
+
+
+
+
+
+
+