summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Mailman/Handlers/SMTPDirect.py105
1 files changed, 86 insertions, 19 deletions
diff --git a/Mailman/Handlers/SMTPDirect.py b/Mailman/Handlers/SMTPDirect.py
index 400d22667..19d0d67a8 100644
--- a/Mailman/Handlers/SMTPDirect.py
+++ b/Mailman/Handlers/SMTPDirect.py
@@ -31,8 +31,17 @@ import socket
from Mailman import mm_cfg
from Mailman import Utils
from Mailman.Handlers import HandlerAPI
+from Mailman.Logging.Syslog import syslog
from Mailman.pythonlib import smtplib
+threading = None
+try:
+ if mm_cfg.MAX_DELIVERY_THREADS > 0:
+ import threading
+ import Queue
+except ImportError:
+ pass
+
def process(mlist, msg, msgdata):
@@ -40,6 +49,8 @@ def process(mlist, msg, msgdata):
if not recips:
# Nobody to deliver to!
return
+ admin = mlist.GetAdminEmail()
+ msgtext = str(msg)
#
# Split the recipient list into SMTP_MAX_RCPTS chunks. Most MTAs have a
# limit on the number of recipients they'll swallow in a single
@@ -49,10 +60,16 @@ def process(mlist, msg, msgdata):
else:
chunks = chunkify(recips, mm_cfg.SMTP_MAX_RCPTS)
refused = {}
- for chunk in chunks:
- failures = deliver(mlist, msg, chunk)
- refused.update(failures)
+ t0 = time.time()
+ if threading:
+ threaded_deliver(admin, msgtext, chunks, refused)
+ else:
+ for chunk in chunks:
+ deliver(admin, msgtext, chunk, refused)
#
+ t1 = time.time()
+ syslog('smtp', 'smtp for %d recips, completed in %.3f seconds' %
+ (len(recips), (t1-t0)))
# Process any failed deliveries.
tempfailures = []
for recip, (code, smtpmsg) in refused.items():
@@ -73,7 +90,7 @@ def process(mlist, msg, msgdata):
else:
# Deal with persistent transient failures by queuing them up for
# future delivery. TBD: this could generate lots of log entries!
- mlist.LogMsg('smtp-failure', '%d %s (%s)' % (code, recip, smtpmsg))
+ syslog('smtp-failure', '%d %s (%s)' % (code, recip, smtpmsg))
tempfailures.append(recip)
if tempfailures:
msgdata['recips'] = tempfailures
@@ -84,13 +101,23 @@ def process(mlist, msg, msgdata):
def chunkify(recips, chunksize):
# First do a simple sort on top level domain. It probably doesn't buy us
# much to try to sort on MX record -- that's the MTA's job. We're just
- # trying to avoid getting a max recips error.
+ # trying to avoid getting a max recips error. Split the chunks along
+ # these lines (as suggested originally by Chuq Von Rospach and slightly
+ # elaborated by BAW).
+ chunkmap = {'com': 1,
+ 'net': 2,
+ 'org': 2,
+ 'edu': 3,
+ 'us' : 3,
+ 'ca' : 3,
+ }
buckets = {}
for r in recips:
- bin = None
+ tld = None
i = string.rfind(r, '.')
if i >= 0:
- bin = r[i+1:]
+ tld = r[i+1:]
+ bin = chunkmap.get(tld, 0)
bucket = buckets.get(bin, [])
bucket.append(r)
buckets[bin] = bucket
@@ -106,39 +133,79 @@ def chunkify(recips, chunksize):
chunks.append(currentchunk)
currentchunk = []
chunklen = 0
- if currentchunk:
- chunks.append(currentchunk)
+ if currentchunk:
+ chunks.append(currentchunk)
+ currentchunk = []
+ chunklen = 0
return chunks
-def deliver(mlist, msg, recips):
+def pre_deliver(envsender, msgtext, failures, chunkq):
+ while 1:
+ # Get the next recipient chunk, if there is one
+ try:
+ recips = chunkq.get(0)
+ except Queue.Empty:
+ # We're done
+ break
+ # Otherwise, process the chunk
+ deliver(envsender, msgtext, recips, failures)
+
+
+def threaded_deliver(envsender, msgtext, chunks, failures):
+ syslog('error', 'threaded_deliver...')
+ threads = {}
+ numchunks = len(chunks)
+ chunkq = Queue.Queue(numchunks)
+ # Populate the queue with all the chunks that need processing.
+ for chunk in chunks:
+ chunkq.put(chunk)
+ # Start all the threads
+ for i in range(min(numchunks, mm_cfg.MAX_DELIVERY_THREADS)):
+ syslog('error', 'creating thread %d' % i)
+ threadfailures = {}
+ t = threading.Thread(target=pre_deliver,
+ args=(envsender, msgtext, threadfailures, chunkq))
+ threads[t] = threadfailures
+ t.start()
+ # Now wait for all the threads to complete and collate their failure
+ # dictionaries.
+ for t, threadfailures in threads.items():
+ t.join()
+ syslog('error', 'thread %s complete' % t)
+ failures.update(threadfailures)
+ # All threads have exited
+ threads.clear()
+ syslog('error', 'threaded_deliver... done.')
+
+
+
+def deliver(envsender, msgtext, recips, failures):
refused = {}
# Gather statistics on how long each SMTP dialog takes.
- t0 = time.time()
+## t0 = time.time()
try:
conn = smtplib.SMTP(mm_cfg.SMTPHOST, mm_cfg.SMTPPORT)
try:
# make sure the connect happens, which won't be done by the
# constructor if SMTPHOST is false
- envsender = mlist.GetAdminEmail()
- refused = conn.sendmail(envsender, recips, str(msg))
+ refused = conn.sendmail(envsender, recips, msgtext)
finally:
- t1 = time.time()
- mlist.LogMsg('smtp',
- 'smtp for %d recips, completed in %.3f seconds' %
- (len(recips), (t1-t0)))
+## t1 = time.time()
+## syslog('smtp', 'smtp for %d recips, completed in %.3f seconds' %
+## (len(recips), (t1-t0)))
conn.quit()
except smtplib.SMTPRecipientsRefused, e:
refused = e.recipients
# MTA not responding, or other socket problems, or any other kind of
# SMTPException. In that case, nothing got delivered
except (socket.error, smtplib.SMTPException), e:
- mlist.LogMsg('smtp', 'All recipients refused: %s' % e)
+ syslog('smtp', 'All recipients refused: %s' % e)
# If the exception had an associated error code, use it, otherwise,
# fake it with a non-triggering exception code
errcode = getattr(e, 'smtp_code', -1)
errmsg = getattr(e, 'smtp_error', 'ignore')
for r in recips:
refused[r] = (errcode, errmsg)
- return refused
+ failures.update(refused)