summaryrefslogtreecommitdiff
path: root/Mailman/Queue
diff options
context:
space:
mode:
Diffstat (limited to 'Mailman/Queue')
-rw-r--r--Mailman/Queue/ArchRunner.py2
-rw-r--r--Mailman/Queue/BounceRunner.py216
-rw-r--r--Mailman/Queue/CommandRunner.py23
-rw-r--r--Mailman/Queue/NewsRunner.py13
-rw-r--r--Mailman/Queue/OutgoingRunner.py108
-rw-r--r--Mailman/Queue/Runner.py59
-rw-r--r--Mailman/Queue/Switchboard.py265
7 files changed, 307 insertions, 379 deletions
diff --git a/Mailman/Queue/ArchRunner.py b/Mailman/Queue/ArchRunner.py
index cbb49736b..0abb1d1b2 100644
--- a/Mailman/Queue/ArchRunner.py
+++ b/Mailman/Queue/ArchRunner.py
@@ -14,7 +14,7 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-"""Outgoing queue runner."""
+"""Archive queue runner."""
import time
from email.Utils import parsedate_tz, mktime_tz, formatdate
diff --git a/Mailman/Queue/BounceRunner.py b/Mailman/Queue/BounceRunner.py
index 02c06e680..76a304708 100644
--- a/Mailman/Queue/BounceRunner.py
+++ b/Mailman/Queue/BounceRunner.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2001,2002 by the Free Software Foundation, Inc.
+# Copyright (C) 2001-2004 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -16,8 +16,10 @@
"""Bounce queue runner."""
+import os
import re
import time
+import cPickle
from email.MIMEText import MIMEText
from email.MIMEMessage import MIMEMessage
@@ -35,22 +37,131 @@ from Mailman.i18n import _
COMMASPACE = ', '
-REGISTER_BOUNCES_EVERY = mm_cfg.minutes(15)
+try:
+ True, False
+except NameError:
+ True = 1
+ False = 0
-class BounceRunner(Runner):
+class BounceMixin:
+ def __init__(self):
+ # Registering a bounce means acquiring the list lock, and it would be
+ # too expensive to do this for each message. Instead, each bounce
+ # runner maintains an event log which is essentially a file with
+ # multiple pickles. Each bounce we receive gets appended to this file
+ # as a 4-tuple record: (listname, addr, today, msg)
+ #
+ # today is itself a 3-tuple of (year, month, day)
+ #
+ # Every once in a while (see _doperiodic()), the bounce runner cracks
+ # open the file, reads all the records and registers all the bounces.
+ # Then it truncates the file and continues on. We don't need to lock
+ # the bounce event file because bounce qrunners are single threaded
+ # and each creates a uniquely named file to contain the events.
+ #
+ # XXX When Python 2.3 is minimal require, we can use the new
+ # tempfile.TemporaryFile() function.
+ #
+ # XXX We used to classify bounces to the site list as bounce events
+ # for every list, but this caused severe problems. Here's the
+ # scenario: aperson@example.com is a member of 4 lists, and a list
+ # owner of the foo list. example.com has an aggressive spam filter
+ # which rejects any message that is spam or contains spam as an
+ # attachment. Now, a spambot sends a piece of spam to the foo list,
+ # but since that spambot is not a member, the list holds the message
+ # for approval, and sends a notification to aperson@example.com as
+ # list owner. That notification contains a copy of the spam. Now
+ # example.com rejects the message, causing a bounce to be sent to the
+ # site list's bounce address. The bounce runner would then dutifully
+ # register a bounce for all 4 lists that aperson@example.com was a
+ # member of, and eventually that person would get disabled on all
+ # their lists. So now we ignore site list bounces. Ce La Vie for
+ # password reminder bounces.
+ self._bounce_events_file = os.path.join(
+ mm_cfg.DATA_DIR, 'bounce-events-%05d.pck' % os.getpid())
+ self._bounce_events_fp = None
+ self._bouncecnt = 0
+ self._nextaction = time.time() + mm_cfg.REGISTER_BOUNCES_EVERY
+
+ def _queue_bounces(self, listname, addrs, msg):
+ today = time.localtime()[:3]
+ if self._bounce_events_fp is None:
+ self._bounce_events_fp = open(self._bounce_events_file, 'a+b')
+ for addr in addrs:
+ cPickle.dump((listname, addr, today, msg),
+ self._bounce_events_fp, 1)
+ self._bounce_events_fp.flush()
+ os.fsync(self._bounce_events_fp.fileno())
+ self._bouncecnt += len(addrs)
+
+ def _register_bounces(self):
+ syslog('bounce', '%s processing %s queued bounces',
+ self, self._bouncecnt)
+ # Read all the records from the bounce file, then unlink it. Sort the
+ # records by listname for more efficient processing.
+ events = {}
+ self._bounce_events_fp.seek(0)
+ while True:
+ try:
+ listname, addr, day, msg = cPickle.load(self._bounce_events_fp)
+ except ValueError, e:
+ syslog('bounce', 'Error reading bounce events: %s', e)
+ except EOFError:
+ break
+ events.setdefault(listname, []).append((addr, day, msg))
+ # Now register all events sorted by list
+ for listname in events.keys():
+ mlist = self._open_list(listname)
+ mlist.Lock()
+ try:
+ for addr, day, msg in events[listname]:
+ mlist.registerBounce(addr, msg, day=day)
+ mlist.Save()
+ finally:
+ mlist.Unlock()
+ # Reset and free all the cached memory
+ self._bounce_events_fp.close()
+ self._bounce_events_fp = None
+ os.unlink(self._bounce_events_file)
+ self._bouncecnt = 0
+
+ def _cleanup(self):
+ if self._bouncecnt > 0:
+ self._register_bounces()
+
+ def _doperiodic(self):
+ now = time.time()
+ if self._nextaction > now or self._bouncecnt == 0:
+ return
+ # Let's go ahead and register the bounces we've got stored up
+ self._nextaction = now + mm_cfg.REGISTER_BOUNCES_EVERY
+ self._register_bounces()
+
+ def _probe_bounce(self, mlist, token):
+ locked = mlist.Locked()
+ if not locked:
+ mlist.Lock()
+ try:
+ op, addr, bmsg = mlist.pend_confirm(token)
+ info = mlist.getBounceInfo(addr)
+ mlist.disableBouncingMember(addr, info, bmsg)
+ # Only save the list if we're unlocking it
+ if not locked:
+ mlist.Save()
+ finally:
+ if not locked:
+ mlist.Unlock()
+
+
+
+class BounceRunner(Runner, BounceMixin):
QDIR = mm_cfg.BOUNCEQUEUE_DIR
def __init__(self, slice=None, numslices=1):
Runner.__init__(self, slice, numslices)
- # This is a simple sequence of bounce score events. Each entry in the
- # list is a tuple of (address, day, msg) where day is a tuple of
- # (YYYY, MM, DD). We'll sort and collate all this information in
- # _register_bounces() below.
- self._bounces = {}
- self._bouncecnt = 0
- self._next_registration = time.time() + REGISTER_BOUNCES_EVERY
+ BounceMixin.__init__(self)
def _dispose(self, mlist, msg, msgdata):
# Make sure we have the most up-to-date state
@@ -68,7 +179,7 @@ class BounceRunner(Runner):
# All messages to list-owner@vdom.ain have their envelope sender set
# to site-owner@dom.ain (no virtual domain). Is this a bounce for a
# message to a list owner, coming to the site owner?
- if msg.get('to', '') == Utils.get_site_email(extra='-owner'):
+ if msg.get('to', '') == Utils.get_site_email(extra='owner'):
# Send it on to the site owners, but craft the envelope sender to
# be the -loop detection address, so if /they/ bounce, we won't
# get stuck in a bounce loop.
@@ -82,8 +193,13 @@ class BounceRunner(Runner):
# Try VERP detection first, since it's quick and easy
addrs = verp_bounce(mlist, msg)
if not addrs:
+ # See if this was a probe message.
+ token = verp_probe(mlist, msg)
+ if token:
+ self._probe_bounce(mlist, token)
+ return
# That didn't give us anything useful, so try the old fashion
- # bounce matching modules
+ # bounce matching modules.
addrs = BouncerAPI.ScanMessages(mlist, msg)
# If that still didn't return us any useful addresses, then send it on
# or discard it.
@@ -96,47 +212,12 @@ class BounceRunner(Runner):
# although I'm unsure how that could happen. Possibly ScanMessages()
# can let None's sneak through. In any event, this will kill them.
addrs = filter(None, addrs)
- # Store the bounce score events so we can register them periodically
- today = time.localtime()[:3]
- events = [(addr, today, msg) for addr in addrs]
- self._bounces.setdefault(mlist.internal_name(), []).extend(events)
- self._bouncecnt += len(addrs)
+ self._queue_bounces(mlist.internal_name(), addrs, msg)
- def _doperiodic(self):
- now = time.time()
- if self._next_registration > now or not self._bounces:
- return
- # Let's go ahead and register the bounces we've got stored up
- self._next_registration = now + REGISTER_BOUNCES_EVERY
- self._register_bounces()
-
- def _register_bounces(self):
- syslog('bounce', 'Processing %s queued bounces', self._bouncecnt)
- # First, get the list of bounces register against the site list. For
- # these addresses, we want to register a bounce on every list the
- # address is a member of -- which we don't know yet.
- sitebounces = self._bounces.get(mm_cfg.MAILMAN_SITE_LIST, [])
- if sitebounces:
- listnames = Utils.list_names()
- else:
- listnames = self._bounces.keys()
- for listname in listnames:
- mlist = self._open_list(listname)
- mlist.Lock()
- try:
- events = self._bounces.get(listname, []) + sitebounces
- for addr, day, msg in events:
- mlist.registerBounce(addr, msg, day=day)
- mlist.Save()
- finally:
- mlist.Unlock()
- # Reset and free all the cached memory
- self._bounces = {}
- self._bouncecnt = 0
+ _doperiodic = BounceMixin._doperiodic
def _cleanup(self):
- if self._bounces:
- self._register_bounces()
+ BounceMixin._cleanup(self)
Runner._cleanup(self)
@@ -173,6 +254,41 @@ def verp_bounce(mlist, msg):
+def verp_probe(mlist, msg):
+ bmailbox, bdomain = Utils.ParseEmail(mlist.GetBouncesEmail())
+ # Sadly not every MTA bounces VERP messages correctly, or consistently.
+ # Fall back to Delivered-To: (Postfix), Envelope-To: (Exim) and
+ # Apparently-To:, and then short-circuit if we still don't have anything
+ # to work with. Note that there can be multiple Delivered-To: headers so
+ # we need to search them all (and we don't worry about false positives for
+ # forwarded email, because only one should match VERP_REGEXP).
+ vals = []
+ for header in ('to', 'delivered-to', 'envelope-to', 'apparently-to'):
+ vals.extend(msg.get_all(header, []))
+ for field in vals:
+ to = parseaddr(field)[1]
+ if not to:
+ continue # empty header
+ mo = re.search(mm_cfg.VERP_PROBE_REGEXP, to)
+ if not mo:
+ continue # no match of regexp
+ try:
+ if bmailbox <> mo.group('bounces'):
+ continue # not a bounce to our list
+ # Extract the token and see if there's an entry
+ token = mo.group('token')
+ data = mlist.pend_confirm(token, expunge=False)
+ if data is not None:
+ return token
+ except IndexError:
+ syslog(
+ 'error',
+ "VERP_PROBE_REGEXP doesn't yield the right match groups: %s",
+ mm_cfg.VERP_PROBE_REGEXP)
+ return None
+
+
+
def maybe_forward(mlist, msg):
# Does the list owner want to get non-matching bounce messages?
# If not, simply discard it.
diff --git a/Mailman/Queue/CommandRunner.py b/Mailman/Queue/CommandRunner.py
index 8967e2f65..e08d02eb5 100644
--- a/Mailman/Queue/CommandRunner.py
+++ b/Mailman/Queue/CommandRunner.py
@@ -1,4 +1,4 @@
-# Copyright (C) 1998-2003 by the Free Software Foundation, Inc.
+# Copyright (C) 1998-2004 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -40,9 +40,10 @@ from Mailman.Logging.Syslog import syslog
from Mailman import LockFile
from email.Header import decode_header, make_header, Header
+from email.Errors import HeaderParseError
+from email.Iterators import typed_subpart_iterator
from email.MIMEText import MIMEText
from email.MIMEMessage import MIMEMessage
-from email.Iterators import typed_subpart_iterator
NL = '\n'
@@ -72,9 +73,15 @@ class Results:
# Extract the subject header and do RFC 2047 decoding. Note that
# Python 2.1's unicode() builtin doesn't call obj.__unicode__().
subj = msg.get('subject', '')
- subj = make_header(decode_header(subj)).__unicode__()
- # Always process the Subject: header first
- self.commands.append(subj)
+ try:
+ subj = make_header(decode_header(subj)).__unicode__()
+ # TK: Currently we don't allow 8bit or multibyte in mail command.
+ subj = subj.encode('us-ascii')
+ # Always process the Subject: header first
+ self.commands.append(subj)
+ except (HeaderParseError, UnicodeError, LookupError):
+ # We couldn't parse it so ignore the Subject header
+ pass
# Find the first text/plain part
part = None
for part in typed_subpart_iterator(msg, 'text', 'plain'):
@@ -163,7 +170,7 @@ To obtain instructions, send a message containing just the word "help".
resp.append(_('\n- Done.\n\n'))
# Encode any unicode strings into the list charset, so we don't try to
# join unicode strings and invalid ASCII.
- charset = Utils.GetCharSet(self.mlist.preferred_language)
+ charset = Utils.GetCharSet(self.msgdata['lang'])
encoded_resp = []
for item in resp:
if isinstance(item, UnicodeType):
@@ -179,13 +186,13 @@ To obtain instructions, send a message containing just the word "help".
# BAW: We wait until now to make this decision since our sender may
# not be self.msg.get_sender(), but I'm not sure this is right.
recip = self.returnaddr or self.msg.get_sender()
- if not self.mlist.autorespondToSender(recip):
+ if not self.mlist.autorespondToSender(recip, self.msgdata['lang']):
return
msg = Message.UserNotification(
recip,
self.mlist.GetBouncesEmail(),
_('The results of your email commands'),
- lang=self.mlist.preferred_language)
+ lang=self.msgdata['lang'])
msg.set_type('multipart/mixed')
msg.attach(results)
orig = MIMEMessage(self.msg)
diff --git a/Mailman/Queue/NewsRunner.py b/Mailman/Queue/NewsRunner.py
index cdfd5fead..448500633 100644
--- a/Mailman/Queue/NewsRunner.py
+++ b/Mailman/Queue/NewsRunner.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2000-2003 by the Free Software Foundation, Inc.
+# Copyright (C) 2000-2005 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -102,11 +102,14 @@ def prepare_message(mlist, msg, msgdata):
del msg['approved']
msg['Approved'] = mlist.GetListEmail()
# Should we restore the original, non-prefixed subject for gatewayed
- # messages?
- origsubj = msgdata.get('origsubj')
- if not mlist.news_prefix_subject_too and origsubj is not None:
+ # messages? TK: We use stripped_subject (prefix stripped) which was
+ # crafted in CookHeaders.py to ensure prefix was stripped from the subject
+ # came from mailing list user.
+ stripped_subject = msgdata.get('stripped_subject') \
+ or msgdata.get('origsubj')
+ if not mlist.news_prefix_subject_too and stripped_subject is not None:
del msg['subject']
- msg['subject'] = origsubj
+ msg['subject'] = stripped_subject
# Add the appropriate Newsgroups: header
ngheader = msg['newsgroups']
if ngheader is not None:
diff --git a/Mailman/Queue/OutgoingRunner.py b/Mailman/Queue/OutgoingRunner.py
index 13dc2014a..001b68645 100644
--- a/Mailman/Queue/OutgoingRunner.py
+++ b/Mailman/Queue/OutgoingRunner.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2000-2003 by the Free Software Foundation, Inc.
+# Copyright (C) 2000-2004 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -30,6 +30,7 @@ from Mailman import Errors
from Mailman import LockFile
from Mailman.Queue.Runner import Runner
from Mailman.Queue.Switchboard import Switchboard
+from Mailman.Queue.BounceRunner import BounceMixin
from Mailman.Logging.Syslog import syslog
# This controls how often _doperiodic() will try to deal with deferred
@@ -44,14 +45,12 @@ except NameError:
-class OutgoingRunner(Runner):
+class OutgoingRunner(Runner, BounceMixin):
QDIR = mm_cfg.OUTQUEUE_DIR
def __init__(self, slice=None, numslices=1):
Runner.__init__(self, slice, numslices)
- # Maps mailing lists to (recip, msg) tuples
- self._permfailures = {}
- self._permfail_counter = 0
+ BounceMixin.__init__(self)
# We look this function up only at startup time
modname = 'Mailman.Handlers.' + mm_cfg.DELIVERY_MODULE
mod = __import__(modname)
@@ -63,6 +62,10 @@ class OutgoingRunner(Runner):
self.__retryq = Switchboard(mm_cfg.RETRYQUEUE_DIR)
def _dispose(self, mlist, msg, msgdata):
+ # See if we should retry delivery of this message again.
+ deliver_after = msgdata.get('deliver_after', 0)
+ if time.time() < deliver_after:
+ return True
# Make sure we have the most up-to-date state
mlist.Load()
try:
@@ -87,67 +90,46 @@ class OutgoingRunner(Runner):
self.__logged = True
return True
except Errors.SomeRecipientsFailed, e:
- # The delivery module being used (SMTPDirect or Sendmail) failed
- # to deliver the message to one or all of the recipients.
- # Permanent failures should be registered (but registration
- # requires the list lock), and temporary failures should be
- # retried later.
- #
- # For permanent failures, make a copy of the message for bounce
- # handling. I'm not sure this is necessary, or the right thing to
- # do.
- if e.permfailures:
- pcnt = len(e.permfailures)
- msgcopy = copy.deepcopy(msg)
- self._permfailures.setdefault(mlist, []).extend(
- zip(e.permfailures, [msgcopy] * pcnt))
- # Move temporary failures to the qfiles/retry queue which will
- # occasionally move them back here for another shot at delivery.
- if e.tempfailures:
- now = time.time()
- recips = e.tempfailures
- last_recip_count = msgdata.get('last_recip_count', 0)
- deliver_until = msgdata.get('deliver_until', now)
- if len(recips) == last_recip_count:
- # We didn't make any progress, so don't attempt delivery
- # any longer. BAW: is this the best disposition?
- if now > deliver_until:
- return False
- else:
- # Keep trying to delivery this message for a while
- deliver_until = now + mm_cfg.DELIVERY_RETRY_PERIOD
- msgdata['last_recip_count'] = len(recips)
- msgdata['deliver_until'] = deliver_until
- msgdata['recips'] = recips
- self.__retryq.enqueue(msg, msgdata)
+ # Handle local rejects of probe messages differently.
+ if msgdata.get('probe_token'):
+ self._probe_bounce(mlist, msgdata['probe_token'])
+ else:
+ # Delivery failed at SMTP time for some or all of the
+ # recipients. Permanent failures are registered as bounces,
+ # but temporary failures are retried for later.
+ #
+ # BAW: msg is going to be the original message that failed
+ # delivery, not a bounce message. This may be confusing if
+ # this is what's sent to the user in the probe message. Maybe
+ # we should craft a bounce-like message containing information
+ # about the permanent SMTP failure?
+ self._queue_bounces(mlist.internal_name(), e.permfailures, msg)
+ # Move temporary failures to the qfiles/retry queue which will
+ # occasionally move them back here for another shot at
+ # delivery.
+ if e.tempfailures:
+ now = time.time()
+ recips = e.tempfailures
+ last_recip_count = msgdata.get('last_recip_count', 0)
+ deliver_until = msgdata.get('deliver_until', now)
+ if len(recips) == last_recip_count:
+ # We didn't make any progress, so don't attempt
+ # delivery any longer. BAW: is this the best
+ # disposition?
+ if now > deliver_until:
+ return False
+ else:
+ # Keep trying to delivery this message for a while
+ deliver_until = now + mm_cfg.DELIVERY_RETRY_PERIOD
+ msgdata['last_recip_count'] = len(recips)
+ msgdata['deliver_until'] = deliver_until
+ msgdata['recips'] = recips
+ self.__retryq.enqueue(msg, msgdata)
# We've successfully completed handling of this message
return False
- def _doperiodic(self):
- # Periodically try to acquire the list lock and clear out the
- # permanent failures.
- self._permfail_counter += 1
- if self._permfail_counter < DEAL_WITH_PERMFAILURES_EVERY:
- return
- self._handle_permfailures()
-
- def _handle_permfailures(self):
- # Reset the counter
- self._permfail_counter = 0
- # And deal with the deferred permanent failures.
- for mlist in self._permfailures.keys():
- try:
- mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
- except LockFile.TimeOutError:
- return
- try:
- for recip, msg in self._permfailures[mlist]:
- mlist.registerBounce(recip, msg)
- del self._permfailures[mlist]
- mlist.Save()
- finally:
- mlist.Unlock()
+ _doperiodic = BounceMixin._doperiodic
def _cleanup(self):
- self._handle_permfailures()
+ BounceMixin._cleanup(self)
Runner._cleanup(self)
diff --git a/Mailman/Queue/Runner.py b/Mailman/Queue/Runner.py
index d46ec8a1e..1e7854d7d 100644
--- a/Mailman/Queue/Runner.py
+++ b/Mailman/Queue/Runner.py
@@ -1,4 +1,4 @@
-# Copyright (C) 1998-2003 by the Free Software Foundation, Inc.
+# Copyright (C) 1998-2004 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -31,6 +31,8 @@ from Mailman import i18n
from Mailman.Queue.Switchboard import Switchboard
from Mailman.Logging.Syslog import syslog
+import email.Errors
+
try:
True, False
except NameError:
@@ -52,6 +54,9 @@ class Runner:
self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR)
self._stop = False
+ def __repr__(self):
+ return '<%s at %s>' % (self.__class__.__name__, id(self))
+
def stop(self):
self._stop = True
@@ -88,32 +93,34 @@ class Runner:
# available for this qrunner to process.
files = self._switchboard.files()
for filebase in files:
- # Ask the switchboard for the message and metadata objects
- # associated with this filebase.
- msg, msgdata = self._switchboard.dequeue(filebase)
- # It's possible one or both files got lost. If so, just ignore
- # this filebase entry. dequeue() will automatically unlink the
- # other file, but we should log an error message for diagnostics.
- if msg is None or msgdata is None:
- syslog('error', 'lost data files for filebase: %s', filebase)
- else:
- # Now that we've dequeued the message, we want to be
- # incredibly anal about making sure that no uncaught exception
- # could cause us to lose the message. All runners that
- # implement _dispose() must guarantee that exceptions are
- # caught and dealt with properly. Still, there may be a bug
- # in the infrastructure, and we do not want those to cause
- # messages to be lost. Any uncaught exceptions will cause the
- # message to be stored in the shunt queue for human
+ try:
+ # Ask the switchboard for the message and metadata objects
+ # associated with this filebase.
+ msg, msgdata = self._switchboard.dequeue(filebase)
+ except email.Errors.MessageParseError, e:
+ # It's possible to get here if the message was stored in the
+ # pickle in plain text, and the metadata had a _parsemsg key
+ # that was true, /and/ if the message had some bogosity in
+ # it. It's almost always going to be spam or bounced spam.
+ # There's not much we can do (and we didn't even get the
+ # metadata, so just log the exception and continue.
+ self._log(e)
+ syslog('error', 'Ignoring unparseable message: %s', filebase)
+ continue
+ try:
+ self._onefile(msg, msgdata)
+ except Exception, e:
+ # All runners that implement _dispose() must guarantee that
+ # exceptions are caught and dealt with properly. Still, there
+ # may be a bug in the infrastructure, and we do not want those
+ # to cause messages to be lost. Any uncaught exceptions will
+ # cause the message to be stored in the shunt queue for human
# intervention.
- try:
- self._onefile(msg, msgdata)
- except Exception, e:
- self._log(e)
- # Put a marker in the metadata for unshunting
- msgdata['whichq'] = self._switchboard.whichq()
- filebase = self._shunt.enqueue(msg, msgdata)
- syslog('error', 'SHUNTING: %s', filebase)
+ self._log(e)
+ # Put a marker in the metadata for unshunting
+ msgdata['whichq'] = self._switchboard.whichq()
+ filebase = self._shunt.enqueue(msg, msgdata)
+ syslog('error', 'SHUNTING: %s', filebase)
# Other work we want to do each time through the loop
Utils.reap(self._kids, once=True)
self._doperiodic()
diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py
index bf0d8d409..52c430b0a 100644
--- a/Mailman/Queue/Switchboard.py
+++ b/Mailman/Queue/Switchboard.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2001-2003 by the Free Software Foundation, Inc.
+# Copyright (C) 2001-2004 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -34,13 +34,12 @@
# needs.
import os
-import time
import sha
-import marshal
+import time
+import email
import errno
import cPickle
-
-import email
+import marshal
from Mailman import mm_cfg
from Mailman import Utils
@@ -63,7 +62,7 @@ SAVE_MSGS_AS_PICKLES = True
-class _Switchboard:
+class Switchboard:
def __init__(self, whichq, slice=None, numslices=1):
self.__whichq = whichq
# Create the directory if it doesn't yet exist.
@@ -97,11 +96,11 @@ class _Switchboard:
# Get some data for the input to the sha hash
now = time.time()
if SAVE_MSGS_AS_PICKLES and not data.get('_plaintext'):
- msgsave = cPickle.dumps(_msg, 1)
- ext = '.pck'
+ protocol = 1
+ msgsave = cPickle.dumps(_msg, protocol)
else:
- msgsave = str(_msg)
- ext = '.msg'
+ protocol = 0
+ msgsave = cPickle.dumps(str(_msg), protocol)
hashfood = msgsave + listname + `now`
# Encode the current time into the file name for FIFO sorting in
# files(). The file name consists of two parts separated by a `+':
@@ -110,92 +109,46 @@ class _Switchboard:
#rcvtime = data.setdefault('received_time', now)
rcvtime = data.setdefault('received_time', now)
filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest()
- # Figure out which queue files the message is to be written to.
- msgfile = os.path.join(self.__whichq, filebase + ext)
- dbfile = os.path.join(self.__whichq, filebase + '.db')
+ filename = os.path.join(self.__whichq, filebase + '.pck')
+ tmpfile = filename + '.tmp'
# Always add the metadata schema version number
data['version'] = mm_cfg.QFILE_SCHEMA_VERSION
# Filter out volatile entries
for k in data.keys():
- if k[0] == '_':
+ if k.startswith('_'):
del data[k]
- # Now write the message text to one file and the metadata to another
- # file. The metadata is always written second to avoid race
- # conditions with the various queue runners (which key off of the .db
- # filename).
+ # We have to tell the dequeue() method whether to parse the message
+ # object or not.
+ data['_parsemsg'] = (protocol == 0)
+ # Write to the pickle file the message object and metadata.
omask = os.umask(007) # -rw-rw----
try:
- msgfp = open(msgfile, 'w')
+ fp = open(tmpfile, 'w')
+ try:
+ fp.write(msgsave)
+ cPickle.dump(data, fp, protocol)
+ fp.flush()
+ os.fsync(fp.fileno())
+ finally:
+ fp.close()
finally:
os.umask(omask)
- msgfp.write(msgsave)
- msgfp.close()
- # Now write the metadata using the appropriate external metadata
- # format. We play rename-switcheroo here to further plug the race
- # condition holes.
- tmpfile = dbfile + '.tmp'
- self._ext_write(tmpfile, data)
- os.rename(tmpfile, dbfile)
+ os.rename(tmpfile, filename)
return filebase
def dequeue(self, filebase):
- # Calculate the .db and .msg filenames from the given filebase.
- msgfile = os.path.join(self.__whichq, filebase + '.msg')
- pckfile = os.path.join(self.__whichq, filebase + '.pck')
- dbfile = os.path.join(self.__whichq, filebase + '.db')
- # Now we are going to read the message and metadata for the given
- # filebase. We want to read things in this order: first, the metadata
- # file to find out whether the message is stored as a pickle or as
- # plain text. Second, the actual message file. However, we want to
- # first unlink the message file and then the .db file, because the
- # qrunner only cues off of the .db file
- msg = None
- try:
- data = self._ext_read(dbfile)
- os.unlink(dbfile)
- except EnvironmentError, e:
- if e.errno <> errno.ENOENT: raise
- data = {}
- # Between 2.1b4 and 2.1b5, the `rejection-notice' key in the metadata
- # was renamed to `rejection_notice', since dashes in the keys are not
- # supported in METAFMT_ASCII.
- if data.has_key('rejection-notice'):
- data['rejection_notice'] = data['rejection-notice']
- del data['rejection-notice']
- msgfp = None
+ # Calculate the filename from the given filebase.
+ filename = os.path.join(self.__whichq, filebase + '.pck')
+ # Read the message object and metadata.
+ fp = open(filename)
+ os.unlink(filename)
try:
- try:
- msgfp = open(pckfile)
- msg = cPickle.load(msgfp)
- os.unlink(pckfile)
- except EnvironmentError, e:
- if e.errno <> errno.ENOENT: raise
- msgfp = None
- try:
- msgfp = open(msgfile)
- msg = email.message_from_file(msgfp, Message.Message)
- os.unlink(msgfile)
- except EnvironmentError, e:
- if e.errno <> errno.ENOENT: raise
- except email.Errors.MessageParseError, e:
- # This message was unparsable, most likely because its
- # MIME encapsulation was broken. For now, there's not
- # much we can do about it.
- syslog('error', 'message is unparsable: %s', filebase)
- msgfp.close()
- msgfp = None
- if mm_cfg.QRUNNER_SAVE_BAD_MESSAGES:
- # Cheapo way to ensure the directory exists w/ the
- # proper permissions.
- sb = Switchboard(mm_cfg.BADQUEUE_DIR)
- os.rename(msgfile, os.path.join(
- mm_cfg.BADQUEUE_DIR, filebase + '.txt'))
- else:
- os.unlink(msgfile)
- msg = data = None
+ msg = cPickle.load(fp)
+ data = cPickle.load(fp)
finally:
- if msgfp:
- msgfp.close()
+ fp.close()
+ if data.get('_parsemsg'):
+ msg = email.message_from_string(msg, Message.Message)
return msg, data
def files(self):
@@ -203,157 +156,17 @@ class _Switchboard:
lower = self.__lower
upper = self.__upper
for f in os.listdir(self.__whichq):
- # We only care about the file's base name (i.e. no extension).
- # Thus we'll ignore anything that doesn't end in .db.
- if not f.endswith('.db'):
+ # By ignoring anything that doesn't end in .pck, we ignore
+ # tempfiles and avoid a race condition.
+ if not f.endswith('.pck'):
continue
filebase = os.path.splitext(f)[0]
when, digest = filebase.split('+')
# Throw out any files which don't match our bitrange. BAW: test
# performance and end-cases of this algorithm.
- if not lower or (lower <= long(digest, 16) < upper):
+ if lower is None or (lower <= long(digest, 16) < upper):
times[float(when)] = filebase
# FIFO sort
keys = times.keys()
keys.sort()
return [times[k] for k in keys]
-
- def _ext_write(self, tmpfile, data):
- raise NotImplementedError
-
- def _ext_read(self, dbfile):
- raise NotImplementedError
-
-
-
-class MarshalSwitchboard(_Switchboard):
- """Python marshal format."""
- FLOAT_ATTRIBUTES = ['received_time']
-
- def _ext_write(self, filename, dict):
- omask = os.umask(007) # -rw-rw----
- try:
- fp = open(filename, 'w')
- finally:
- os.umask(omask)
- # Python's marshal, up to and including in Python 2.1, has a bug where
- # the full precision of floats was not stored. We work around this
- # bug by hardcoding a list of float values we know about, repr()-izing
- # them ourselves, and doing the reverse conversion on _ext_read().
- for attr in self.FLOAT_ATTRIBUTES:
- # We use try/except because we expect a hitrate of nearly 100%
- try:
- fval = dict[attr]
- except KeyError:
- pass
- else:
- dict[attr] = repr(fval)
- marshal.dump(dict, fp)
- # Make damn sure that the data we just wrote gets flushed to disk
- fp.flush()
- if mm_cfg.SYNC_AFTER_WRITE:
- os.fsync(fp.fileno())
- fp.close()
-
- def _ext_read(self, filename):
- fp = open(filename)
- dict = marshal.load(fp)
- # Update from version 2 files
- if dict.get('version', 0) == 2:
- del dict['filebase']
- # Do the reverse conversion (repr -> float)
- for attr in self.FLOAT_ATTRIBUTES:
- try:
- sval = dict[attr]
- except KeyError:
- pass
- else:
- # Do a safe eval by setting up a restricted execution
- # environment. This may not be strictly necessary since we
- # know they are floats, but it can't hurt.
- dict[attr] = eval(sval, {'__builtins__': {}})
- fp.close()
- return dict
-
-
-
-class BSDDBSwitchboard(_Switchboard):
- """Native (i.e. compiled-in) Berkeley db format."""
- def _ext_write(self, filename, dict):
- import bsddb
- omask = os.umask(0)
- try:
- hashfile = bsddb.hashopen(filename, 'n', 0660)
- finally:
- os.umask(omask)
- # values must be strings
- for k, v in dict.items():
- hashfile[k] = marshal.dumps(v)
- hashfile.sync()
- hashfile.close()
-
- def _ext_read(self, filename):
- import bsddb
- dict = {}
- hashfile = bsddb.hashopen(filename, 'r')
- for k in hashfile.keys():
- dict[k] = marshal.loads(hashfile[k])
- hashfile.close()
- return dict
-
-
-
-class ASCIISwitchboard(_Switchboard):
- """Human readable .db file format.
-
- key/value pairs are written as
-
- key = value
-
- as real Python code which can be execfile'd.
- """
-
- def _ext_write(self, filename, dict):
- omask = os.umask(007) # -rw-rw----
- try:
- fp = open(filename, 'w')
- finally:
- os.umask(omask)
- for k, v in dict.items():
- print >> fp, '%s = %s' % (k, repr(v))
- # Make damn sure that the data we just wrote gets flushed to disk
- fp.flush()
- if mm_cfg.SYNC_AFTER_WRITE:
- os.fsync(fp.fileno())
- fp.close()
-
- def _ext_read(self, filename):
- dict = {'__builtins__': {}}
- execfile(filename, dict)
- del dict['__builtins__']
- return dict
-
-
-
-# Here are the various types of external file formats available. The format
-# chosen is given defined in the mm_cfg.py configuration file.
-if mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_MARSHAL:
- Switchboard = MarshalSwitchboard
-elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_BSDDB_NATIVE:
- Switchboard = BSDDBSwitchboard
-elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_ASCII:
- Switchboard = ASCIISwitchboard
-else:
- syslog('error', 'Undefined metadata format: %d (using marshals)',
- mm_cfg.METADATA_FORMAT)
- Switchboard = MarshalSwitchboard
-
-
-
-# For bin/dumpdb
-class DumperSwitchboard(Switchboard):
- def __init__(self):
- pass
-
- def read(self, filename):
- return self._ext_read(filename)