summaryrefslogtreecommitdiff
path: root/Mailman/Queue/OutgoingRunner.py
diff options
context:
space:
mode:
Diffstat (limited to 'Mailman/Queue/OutgoingRunner.py')
-rw-r--r--Mailman/Queue/OutgoingRunner.py47
1 files changed, 40 insertions, 7 deletions
diff --git a/Mailman/Queue/OutgoingRunner.py b/Mailman/Queue/OutgoingRunner.py
index af54f6f24..f4803ab8a 100644
--- a/Mailman/Queue/OutgoingRunner.py
+++ b/Mailman/Queue/OutgoingRunner.py
@@ -19,8 +19,12 @@
import sys
import os
+from mimelib.Parser import Parser
+
from Mailman import mm_cfg
+from Mailman import Message
from Mailman import Errors
+from Mailman import LockFile
from Mailman.Queue.Runner import Runner
@@ -29,6 +33,8 @@ class OutgoingRunner(Runner):
def __init__(self, slice=None, numslices=1, cachelists=1):
Runner.__init__(self, mm_cfg.OUTQUEUE_DIR,
slice, numslices, cachelists)
+ # Maps mailing lists to (recip, msg) tuples
+ self._permfailures = {}
def _dispose(self, mlist, msg, msgdata):
# Fortunately, we do not need the list lock to do deliveries.
@@ -43,17 +49,28 @@ class OutgoingRunner(Runner):
if pid <> os.getpid():
syslog('error', 'child process leaked through: %s' % modname)
os._exit(1)
- except Errors.SomeRecipientsFailed:
+ except Errors.SomeRecipientsFailed, e:
# The delivery module being used (SMTPDirect or Sendmail) failed
- # to deliver the message to one or all of the recipients. Requeue
- # the message so that delivery to those temporary failures are
+ # to deliver the message to one or all of the recipients.
+ # Permanent failures should be registered (but registration
+ # requires the list lock), and temporary failures should be
# retried later.
#
- # Consult and adjust some meager metrics that try to decide
- # whether it's worth continuing to attempt delivery of this
- # message.
+ # For permanent failures, make a copy of the message for
+ # HandleBouncingAddress(). I'm not sure this is necessary, or the
+ # right thing to do.
+ pcnt = len(e.permfailures)
+ p = Parser(Message.Message)
+ copy = p.parsestr(str(msg))
+ self._permfailures.setdefault(mlist, []).extend(
+ zip(e.permfailures, [copy] * pcnt))
+ # Temporary failures
+ if not e.tempfailures:
+ # Don't need to keep the message queued if there were only
+ # permanent failures.
+ return 0
now = time.time()
- recips = msgdata['recips']
+ recips = e.tempfailures
last_recip_count = msgdata.get('last_recip_count', 0)
deliver_until = msgdata.get('deliver_until', now)
if len(recips) == last_recip_count:
@@ -75,3 +92,19 @@ class OutgoingRunner(Runner):
return 1
# We've successfully completed handling of this message
return 0
+
+ def _doperiodic(self):
+ # Periodically try to acquire the list lock and clear out the
+ # permanent failures.
+ for mlist in self._permfailures.keys():
+ try:
+ mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
+ except LockFile.TimeOutError:
+ return
+ try:
+ for recip, msg in self._permfailures[mlist]:
+ mlist.RegisterBounce(recip, msg)
+ del self._permfailures[mlist]
+ finally:
+ mlist.Save()
+ mlist.Unlock()