diff options
Diffstat (limited to 'Mailman/Handlers/SMTPDirect.py')
| -rw-r--r-- | Mailman/Handlers/SMTPDirect.py | 105 |
1 files changed, 86 insertions, 19 deletions
diff --git a/Mailman/Handlers/SMTPDirect.py b/Mailman/Handlers/SMTPDirect.py index 400d22667..19d0d67a8 100644 --- a/Mailman/Handlers/SMTPDirect.py +++ b/Mailman/Handlers/SMTPDirect.py @@ -31,8 +31,17 @@ import socket from Mailman import mm_cfg from Mailman import Utils from Mailman.Handlers import HandlerAPI +from Mailman.Logging.Syslog import syslog from Mailman.pythonlib import smtplib +threading = None +try: + if mm_cfg.MAX_DELIVERY_THREADS > 0: + import threading + import Queue +except ImportError: + pass + def process(mlist, msg, msgdata): @@ -40,6 +49,8 @@ def process(mlist, msg, msgdata): if not recips: # Nobody to deliver to! return + admin = mlist.GetAdminEmail() + msgtext = str(msg) # # Split the recipient list into SMTP_MAX_RCPTS chunks. Most MTAs have a # limit on the number of recipients they'll swallow in a single @@ -49,10 +60,16 @@ def process(mlist, msg, msgdata): else: chunks = chunkify(recips, mm_cfg.SMTP_MAX_RCPTS) refused = {} - for chunk in chunks: - failures = deliver(mlist, msg, chunk) - refused.update(failures) + t0 = time.time() + if threading: + threaded_deliver(admin, msgtext, chunks, refused) + else: + for chunk in chunks: + deliver(admin, msgtext, chunk, refused) # + t1 = time.time() + syslog('smtp', 'smtp for %d recips, completed in %.3f seconds' % + (len(recips), (t1-t0))) # Process any failed deliveries. tempfailures = [] for recip, (code, smtpmsg) in refused.items(): @@ -73,7 +90,7 @@ def process(mlist, msg, msgdata): else: # Deal with persistent transient failures by queuing them up for # future delivery. TBD: this could generate lots of log entries! - mlist.LogMsg('smtp-failure', '%d %s (%s)' % (code, recip, smtpmsg)) + syslog('smtp-failure', '%d %s (%s)' % (code, recip, smtpmsg)) tempfailures.append(recip) if tempfailures: msgdata['recips'] = tempfailures @@ -84,13 +101,23 @@ def process(mlist, msg, msgdata): def chunkify(recips, chunksize): # First do a simple sort on top level domain. It probably doesn't buy us # much to try to sort on MX record -- that's the MTA's job. We're just - # trying to avoid getting a max recips error. + # trying to avoid getting a max recips error. Split the chunks along + # these lines (as suggested originally by Chuq Von Rospach and slightly + # elaborated by BAW). + chunkmap = {'com': 1, + 'net': 2, + 'org': 2, + 'edu': 3, + 'us' : 3, + 'ca' : 3, + } buckets = {} for r in recips: - bin = None + tld = None i = string.rfind(r, '.') if i >= 0: - bin = r[i+1:] + tld = r[i+1:] + bin = chunkmap.get(tld, 0) bucket = buckets.get(bin, []) bucket.append(r) buckets[bin] = bucket @@ -106,39 +133,79 @@ def chunkify(recips, chunksize): chunks.append(currentchunk) currentchunk = [] chunklen = 0 - if currentchunk: - chunks.append(currentchunk) + if currentchunk: + chunks.append(currentchunk) + currentchunk = [] + chunklen = 0 return chunks -def deliver(mlist, msg, recips): +def pre_deliver(envsender, msgtext, failures, chunkq): + while 1: + # Get the next recipient chunk, if there is one + try: + recips = chunkq.get(0) + except Queue.Empty: + # We're done + break + # Otherwise, process the chunk + deliver(envsender, msgtext, recips, failures) + + +def threaded_deliver(envsender, msgtext, chunks, failures): + syslog('error', 'threaded_deliver...') + threads = {} + numchunks = len(chunks) + chunkq = Queue.Queue(numchunks) + # Populate the queue with all the chunks that need processing. + for chunk in chunks: + chunkq.put(chunk) + # Start all the threads + for i in range(min(numchunks, mm_cfg.MAX_DELIVERY_THREADS)): + syslog('error', 'creating thread %d' % i) + threadfailures = {} + t = threading.Thread(target=pre_deliver, + args=(envsender, msgtext, threadfailures, chunkq)) + threads[t] = threadfailures + t.start() + # Now wait for all the threads to complete and collate their failure + # dictionaries. + for t, threadfailures in threads.items(): + t.join() + syslog('error', 'thread %s complete' % t) + failures.update(threadfailures) + # All threads have exited + threads.clear() + syslog('error', 'threaded_deliver... done.') + + + +def deliver(envsender, msgtext, recips, failures): refused = {} # Gather statistics on how long each SMTP dialog takes. - t0 = time.time() +## t0 = time.time() try: conn = smtplib.SMTP(mm_cfg.SMTPHOST, mm_cfg.SMTPPORT) try: # make sure the connect happens, which won't be done by the # constructor if SMTPHOST is false - envsender = mlist.GetAdminEmail() - refused = conn.sendmail(envsender, recips, str(msg)) + refused = conn.sendmail(envsender, recips, msgtext) finally: - t1 = time.time() - mlist.LogMsg('smtp', - 'smtp for %d recips, completed in %.3f seconds' % - (len(recips), (t1-t0))) +## t1 = time.time() +## syslog('smtp', 'smtp for %d recips, completed in %.3f seconds' % +## (len(recips), (t1-t0))) conn.quit() except smtplib.SMTPRecipientsRefused, e: refused = e.recipients # MTA not responding, or other socket problems, or any other kind of # SMTPException. In that case, nothing got delivered except (socket.error, smtplib.SMTPException), e: - mlist.LogMsg('smtp', 'All recipients refused: %s' % e) + syslog('smtp', 'All recipients refused: %s' % e) # If the exception had an associated error code, use it, otherwise, # fake it with a non-triggering exception code errcode = getattr(e, 'smtp_code', -1) errmsg = getattr(e, 'smtp_error', 'ignore') for r in recips: refused[r] = (errcode, errmsg) - return refused + failures.update(refused) |
