diff options
Diffstat (limited to 'Mailman/Queue')
| -rw-r--r-- | Mailman/Queue/ArchRunner.py | 2 | ||||
| -rw-r--r-- | Mailman/Queue/BounceRunner.py | 216 | ||||
| -rw-r--r-- | Mailman/Queue/CommandRunner.py | 23 | ||||
| -rw-r--r-- | Mailman/Queue/NewsRunner.py | 13 | ||||
| -rw-r--r-- | Mailman/Queue/OutgoingRunner.py | 108 | ||||
| -rw-r--r-- | Mailman/Queue/Runner.py | 59 | ||||
| -rw-r--r-- | Mailman/Queue/Switchboard.py | 265 |
7 files changed, 307 insertions, 379 deletions
diff --git a/Mailman/Queue/ArchRunner.py b/Mailman/Queue/ArchRunner.py index cbb49736b..0abb1d1b2 100644 --- a/Mailman/Queue/ArchRunner.py +++ b/Mailman/Queue/ArchRunner.py @@ -14,7 +14,7 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -"""Outgoing queue runner.""" +"""Archive queue runner.""" import time from email.Utils import parsedate_tz, mktime_tz, formatdate diff --git a/Mailman/Queue/BounceRunner.py b/Mailman/Queue/BounceRunner.py index 02c06e680..76a304708 100644 --- a/Mailman/Queue/BounceRunner.py +++ b/Mailman/Queue/BounceRunner.py @@ -1,4 +1,4 @@ -# Copyright (C) 2001,2002 by the Free Software Foundation, Inc. +# Copyright (C) 2001-2004 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -16,8 +16,10 @@ """Bounce queue runner.""" +import os import re import time +import cPickle from email.MIMEText import MIMEText from email.MIMEMessage import MIMEMessage @@ -35,22 +37,131 @@ from Mailman.i18n import _ COMMASPACE = ', ' -REGISTER_BOUNCES_EVERY = mm_cfg.minutes(15) +try: + True, False +except NameError: + True = 1 + False = 0 -class BounceRunner(Runner): +class BounceMixin: + def __init__(self): + # Registering a bounce means acquiring the list lock, and it would be + # too expensive to do this for each message. Instead, each bounce + # runner maintains an event log which is essentially a file with + # multiple pickles. Each bounce we receive gets appended to this file + # as a 4-tuple record: (listname, addr, today, msg) + # + # today is itself a 3-tuple of (year, month, day) + # + # Every once in a while (see _doperiodic()), the bounce runner cracks + # open the file, reads all the records and registers all the bounces. + # Then it truncates the file and continues on. We don't need to lock + # the bounce event file because bounce qrunners are single threaded + # and each creates a uniquely named file to contain the events. + # + # XXX When Python 2.3 is minimal require, we can use the new + # tempfile.TemporaryFile() function. + # + # XXX We used to classify bounces to the site list as bounce events + # for every list, but this caused severe problems. Here's the + # scenario: aperson@example.com is a member of 4 lists, and a list + # owner of the foo list. example.com has an aggressive spam filter + # which rejects any message that is spam or contains spam as an + # attachment. Now, a spambot sends a piece of spam to the foo list, + # but since that spambot is not a member, the list holds the message + # for approval, and sends a notification to aperson@example.com as + # list owner. That notification contains a copy of the spam. Now + # example.com rejects the message, causing a bounce to be sent to the + # site list's bounce address. The bounce runner would then dutifully + # register a bounce for all 4 lists that aperson@example.com was a + # member of, and eventually that person would get disabled on all + # their lists. So now we ignore site list bounces. Ce La Vie for + # password reminder bounces. + self._bounce_events_file = os.path.join( + mm_cfg.DATA_DIR, 'bounce-events-%05d.pck' % os.getpid()) + self._bounce_events_fp = None + self._bouncecnt = 0 + self._nextaction = time.time() + mm_cfg.REGISTER_BOUNCES_EVERY + + def _queue_bounces(self, listname, addrs, msg): + today = time.localtime()[:3] + if self._bounce_events_fp is None: + self._bounce_events_fp = open(self._bounce_events_file, 'a+b') + for addr in addrs: + cPickle.dump((listname, addr, today, msg), + self._bounce_events_fp, 1) + self._bounce_events_fp.flush() + os.fsync(self._bounce_events_fp.fileno()) + self._bouncecnt += len(addrs) + + def _register_bounces(self): + syslog('bounce', '%s processing %s queued bounces', + self, self._bouncecnt) + # Read all the records from the bounce file, then unlink it. Sort the + # records by listname for more efficient processing. + events = {} + self._bounce_events_fp.seek(0) + while True: + try: + listname, addr, day, msg = cPickle.load(self._bounce_events_fp) + except ValueError, e: + syslog('bounce', 'Error reading bounce events: %s', e) + except EOFError: + break + events.setdefault(listname, []).append((addr, day, msg)) + # Now register all events sorted by list + for listname in events.keys(): + mlist = self._open_list(listname) + mlist.Lock() + try: + for addr, day, msg in events[listname]: + mlist.registerBounce(addr, msg, day=day) + mlist.Save() + finally: + mlist.Unlock() + # Reset and free all the cached memory + self._bounce_events_fp.close() + self._bounce_events_fp = None + os.unlink(self._bounce_events_file) + self._bouncecnt = 0 + + def _cleanup(self): + if self._bouncecnt > 0: + self._register_bounces() + + def _doperiodic(self): + now = time.time() + if self._nextaction > now or self._bouncecnt == 0: + return + # Let's go ahead and register the bounces we've got stored up + self._nextaction = now + mm_cfg.REGISTER_BOUNCES_EVERY + self._register_bounces() + + def _probe_bounce(self, mlist, token): + locked = mlist.Locked() + if not locked: + mlist.Lock() + try: + op, addr, bmsg = mlist.pend_confirm(token) + info = mlist.getBounceInfo(addr) + mlist.disableBouncingMember(addr, info, bmsg) + # Only save the list if we're unlocking it + if not locked: + mlist.Save() + finally: + if not locked: + mlist.Unlock() + + + +class BounceRunner(Runner, BounceMixin): QDIR = mm_cfg.BOUNCEQUEUE_DIR def __init__(self, slice=None, numslices=1): Runner.__init__(self, slice, numslices) - # This is a simple sequence of bounce score events. Each entry in the - # list is a tuple of (address, day, msg) where day is a tuple of - # (YYYY, MM, DD). We'll sort and collate all this information in - # _register_bounces() below. - self._bounces = {} - self._bouncecnt = 0 - self._next_registration = time.time() + REGISTER_BOUNCES_EVERY + BounceMixin.__init__(self) def _dispose(self, mlist, msg, msgdata): # Make sure we have the most up-to-date state @@ -68,7 +179,7 @@ class BounceRunner(Runner): # All messages to list-owner@vdom.ain have their envelope sender set # to site-owner@dom.ain (no virtual domain). Is this a bounce for a # message to a list owner, coming to the site owner? - if msg.get('to', '') == Utils.get_site_email(extra='-owner'): + if msg.get('to', '') == Utils.get_site_email(extra='owner'): # Send it on to the site owners, but craft the envelope sender to # be the -loop detection address, so if /they/ bounce, we won't # get stuck in a bounce loop. @@ -82,8 +193,13 @@ class BounceRunner(Runner): # Try VERP detection first, since it's quick and easy addrs = verp_bounce(mlist, msg) if not addrs: + # See if this was a probe message. + token = verp_probe(mlist, msg) + if token: + self._probe_bounce(mlist, token) + return # That didn't give us anything useful, so try the old fashion - # bounce matching modules + # bounce matching modules. addrs = BouncerAPI.ScanMessages(mlist, msg) # If that still didn't return us any useful addresses, then send it on # or discard it. @@ -96,47 +212,12 @@ class BounceRunner(Runner): # although I'm unsure how that could happen. Possibly ScanMessages() # can let None's sneak through. In any event, this will kill them. addrs = filter(None, addrs) - # Store the bounce score events so we can register them periodically - today = time.localtime()[:3] - events = [(addr, today, msg) for addr in addrs] - self._bounces.setdefault(mlist.internal_name(), []).extend(events) - self._bouncecnt += len(addrs) + self._queue_bounces(mlist.internal_name(), addrs, msg) - def _doperiodic(self): - now = time.time() - if self._next_registration > now or not self._bounces: - return - # Let's go ahead and register the bounces we've got stored up - self._next_registration = now + REGISTER_BOUNCES_EVERY - self._register_bounces() - - def _register_bounces(self): - syslog('bounce', 'Processing %s queued bounces', self._bouncecnt) - # First, get the list of bounces register against the site list. For - # these addresses, we want to register a bounce on every list the - # address is a member of -- which we don't know yet. - sitebounces = self._bounces.get(mm_cfg.MAILMAN_SITE_LIST, []) - if sitebounces: - listnames = Utils.list_names() - else: - listnames = self._bounces.keys() - for listname in listnames: - mlist = self._open_list(listname) - mlist.Lock() - try: - events = self._bounces.get(listname, []) + sitebounces - for addr, day, msg in events: - mlist.registerBounce(addr, msg, day=day) - mlist.Save() - finally: - mlist.Unlock() - # Reset and free all the cached memory - self._bounces = {} - self._bouncecnt = 0 + _doperiodic = BounceMixin._doperiodic def _cleanup(self): - if self._bounces: - self._register_bounces() + BounceMixin._cleanup(self) Runner._cleanup(self) @@ -173,6 +254,41 @@ def verp_bounce(mlist, msg): +def verp_probe(mlist, msg): + bmailbox, bdomain = Utils.ParseEmail(mlist.GetBouncesEmail()) + # Sadly not every MTA bounces VERP messages correctly, or consistently. + # Fall back to Delivered-To: (Postfix), Envelope-To: (Exim) and + # Apparently-To:, and then short-circuit if we still don't have anything + # to work with. Note that there can be multiple Delivered-To: headers so + # we need to search them all (and we don't worry about false positives for + # forwarded email, because only one should match VERP_REGEXP). + vals = [] + for header in ('to', 'delivered-to', 'envelope-to', 'apparently-to'): + vals.extend(msg.get_all(header, [])) + for field in vals: + to = parseaddr(field)[1] + if not to: + continue # empty header + mo = re.search(mm_cfg.VERP_PROBE_REGEXP, to) + if not mo: + continue # no match of regexp + try: + if bmailbox <> mo.group('bounces'): + continue # not a bounce to our list + # Extract the token and see if there's an entry + token = mo.group('token') + data = mlist.pend_confirm(token, expunge=False) + if data is not None: + return token + except IndexError: + syslog( + 'error', + "VERP_PROBE_REGEXP doesn't yield the right match groups: %s", + mm_cfg.VERP_PROBE_REGEXP) + return None + + + def maybe_forward(mlist, msg): # Does the list owner want to get non-matching bounce messages? # If not, simply discard it. diff --git a/Mailman/Queue/CommandRunner.py b/Mailman/Queue/CommandRunner.py index 8967e2f65..e08d02eb5 100644 --- a/Mailman/Queue/CommandRunner.py +++ b/Mailman/Queue/CommandRunner.py @@ -1,4 +1,4 @@ -# Copyright (C) 1998-2003 by the Free Software Foundation, Inc. +# Copyright (C) 1998-2004 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -40,9 +40,10 @@ from Mailman.Logging.Syslog import syslog from Mailman import LockFile from email.Header import decode_header, make_header, Header +from email.Errors import HeaderParseError +from email.Iterators import typed_subpart_iterator from email.MIMEText import MIMEText from email.MIMEMessage import MIMEMessage -from email.Iterators import typed_subpart_iterator NL = '\n' @@ -72,9 +73,15 @@ class Results: # Extract the subject header and do RFC 2047 decoding. Note that # Python 2.1's unicode() builtin doesn't call obj.__unicode__(). subj = msg.get('subject', '') - subj = make_header(decode_header(subj)).__unicode__() - # Always process the Subject: header first - self.commands.append(subj) + try: + subj = make_header(decode_header(subj)).__unicode__() + # TK: Currently we don't allow 8bit or multibyte in mail command. + subj = subj.encode('us-ascii') + # Always process the Subject: header first + self.commands.append(subj) + except (HeaderParseError, UnicodeError, LookupError): + # We couldn't parse it so ignore the Subject header + pass # Find the first text/plain part part = None for part in typed_subpart_iterator(msg, 'text', 'plain'): @@ -163,7 +170,7 @@ To obtain instructions, send a message containing just the word "help". resp.append(_('\n- Done.\n\n')) # Encode any unicode strings into the list charset, so we don't try to # join unicode strings and invalid ASCII. - charset = Utils.GetCharSet(self.mlist.preferred_language) + charset = Utils.GetCharSet(self.msgdata['lang']) encoded_resp = [] for item in resp: if isinstance(item, UnicodeType): @@ -179,13 +186,13 @@ To obtain instructions, send a message containing just the word "help". # BAW: We wait until now to make this decision since our sender may # not be self.msg.get_sender(), but I'm not sure this is right. recip = self.returnaddr or self.msg.get_sender() - if not self.mlist.autorespondToSender(recip): + if not self.mlist.autorespondToSender(recip, self.msgdata['lang']): return msg = Message.UserNotification( recip, self.mlist.GetBouncesEmail(), _('The results of your email commands'), - lang=self.mlist.preferred_language) + lang=self.msgdata['lang']) msg.set_type('multipart/mixed') msg.attach(results) orig = MIMEMessage(self.msg) diff --git a/Mailman/Queue/NewsRunner.py b/Mailman/Queue/NewsRunner.py index cdfd5fead..448500633 100644 --- a/Mailman/Queue/NewsRunner.py +++ b/Mailman/Queue/NewsRunner.py @@ -1,4 +1,4 @@ -# Copyright (C) 2000-2003 by the Free Software Foundation, Inc. +# Copyright (C) 2000-2005 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -102,11 +102,14 @@ def prepare_message(mlist, msg, msgdata): del msg['approved'] msg['Approved'] = mlist.GetListEmail() # Should we restore the original, non-prefixed subject for gatewayed - # messages? - origsubj = msgdata.get('origsubj') - if not mlist.news_prefix_subject_too and origsubj is not None: + # messages? TK: We use stripped_subject (prefix stripped) which was + # crafted in CookHeaders.py to ensure prefix was stripped from the subject + # came from mailing list user. + stripped_subject = msgdata.get('stripped_subject') \ + or msgdata.get('origsubj') + if not mlist.news_prefix_subject_too and stripped_subject is not None: del msg['subject'] - msg['subject'] = origsubj + msg['subject'] = stripped_subject # Add the appropriate Newsgroups: header ngheader = msg['newsgroups'] if ngheader is not None: diff --git a/Mailman/Queue/OutgoingRunner.py b/Mailman/Queue/OutgoingRunner.py index 13dc2014a..001b68645 100644 --- a/Mailman/Queue/OutgoingRunner.py +++ b/Mailman/Queue/OutgoingRunner.py @@ -1,4 +1,4 @@ -# Copyright (C) 2000-2003 by the Free Software Foundation, Inc. +# Copyright (C) 2000-2004 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -30,6 +30,7 @@ from Mailman import Errors from Mailman import LockFile from Mailman.Queue.Runner import Runner from Mailman.Queue.Switchboard import Switchboard +from Mailman.Queue.BounceRunner import BounceMixin from Mailman.Logging.Syslog import syslog # This controls how often _doperiodic() will try to deal with deferred @@ -44,14 +45,12 @@ except NameError: -class OutgoingRunner(Runner): +class OutgoingRunner(Runner, BounceMixin): QDIR = mm_cfg.OUTQUEUE_DIR def __init__(self, slice=None, numslices=1): Runner.__init__(self, slice, numslices) - # Maps mailing lists to (recip, msg) tuples - self._permfailures = {} - self._permfail_counter = 0 + BounceMixin.__init__(self) # We look this function up only at startup time modname = 'Mailman.Handlers.' + mm_cfg.DELIVERY_MODULE mod = __import__(modname) @@ -63,6 +62,10 @@ class OutgoingRunner(Runner): self.__retryq = Switchboard(mm_cfg.RETRYQUEUE_DIR) def _dispose(self, mlist, msg, msgdata): + # See if we should retry delivery of this message again. + deliver_after = msgdata.get('deliver_after', 0) + if time.time() < deliver_after: + return True # Make sure we have the most up-to-date state mlist.Load() try: @@ -87,67 +90,46 @@ class OutgoingRunner(Runner): self.__logged = True return True except Errors.SomeRecipientsFailed, e: - # The delivery module being used (SMTPDirect or Sendmail) failed - # to deliver the message to one or all of the recipients. - # Permanent failures should be registered (but registration - # requires the list lock), and temporary failures should be - # retried later. - # - # For permanent failures, make a copy of the message for bounce - # handling. I'm not sure this is necessary, or the right thing to - # do. - if e.permfailures: - pcnt = len(e.permfailures) - msgcopy = copy.deepcopy(msg) - self._permfailures.setdefault(mlist, []).extend( - zip(e.permfailures, [msgcopy] * pcnt)) - # Move temporary failures to the qfiles/retry queue which will - # occasionally move them back here for another shot at delivery. - if e.tempfailures: - now = time.time() - recips = e.tempfailures - last_recip_count = msgdata.get('last_recip_count', 0) - deliver_until = msgdata.get('deliver_until', now) - if len(recips) == last_recip_count: - # We didn't make any progress, so don't attempt delivery - # any longer. BAW: is this the best disposition? - if now > deliver_until: - return False - else: - # Keep trying to delivery this message for a while - deliver_until = now + mm_cfg.DELIVERY_RETRY_PERIOD - msgdata['last_recip_count'] = len(recips) - msgdata['deliver_until'] = deliver_until - msgdata['recips'] = recips - self.__retryq.enqueue(msg, msgdata) + # Handle local rejects of probe messages differently. + if msgdata.get('probe_token'): + self._probe_bounce(mlist, msgdata['probe_token']) + else: + # Delivery failed at SMTP time for some or all of the + # recipients. Permanent failures are registered as bounces, + # but temporary failures are retried for later. + # + # BAW: msg is going to be the original message that failed + # delivery, not a bounce message. This may be confusing if + # this is what's sent to the user in the probe message. Maybe + # we should craft a bounce-like message containing information + # about the permanent SMTP failure? + self._queue_bounces(mlist.internal_name(), e.permfailures, msg) + # Move temporary failures to the qfiles/retry queue which will + # occasionally move them back here for another shot at + # delivery. + if e.tempfailures: + now = time.time() + recips = e.tempfailures + last_recip_count = msgdata.get('last_recip_count', 0) + deliver_until = msgdata.get('deliver_until', now) + if len(recips) == last_recip_count: + # We didn't make any progress, so don't attempt + # delivery any longer. BAW: is this the best + # disposition? + if now > deliver_until: + return False + else: + # Keep trying to delivery this message for a while + deliver_until = now + mm_cfg.DELIVERY_RETRY_PERIOD + msgdata['last_recip_count'] = len(recips) + msgdata['deliver_until'] = deliver_until + msgdata['recips'] = recips + self.__retryq.enqueue(msg, msgdata) # We've successfully completed handling of this message return False - def _doperiodic(self): - # Periodically try to acquire the list lock and clear out the - # permanent failures. - self._permfail_counter += 1 - if self._permfail_counter < DEAL_WITH_PERMFAILURES_EVERY: - return - self._handle_permfailures() - - def _handle_permfailures(self): - # Reset the counter - self._permfail_counter = 0 - # And deal with the deferred permanent failures. - for mlist in self._permfailures.keys(): - try: - mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT) - except LockFile.TimeOutError: - return - try: - for recip, msg in self._permfailures[mlist]: - mlist.registerBounce(recip, msg) - del self._permfailures[mlist] - mlist.Save() - finally: - mlist.Unlock() + _doperiodic = BounceMixin._doperiodic def _cleanup(self): - self._handle_permfailures() + BounceMixin._cleanup(self) Runner._cleanup(self) diff --git a/Mailman/Queue/Runner.py b/Mailman/Queue/Runner.py index d46ec8a1e..1e7854d7d 100644 --- a/Mailman/Queue/Runner.py +++ b/Mailman/Queue/Runner.py @@ -1,4 +1,4 @@ -# Copyright (C) 1998-2003 by the Free Software Foundation, Inc. +# Copyright (C) 1998-2004 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -31,6 +31,8 @@ from Mailman import i18n from Mailman.Queue.Switchboard import Switchboard from Mailman.Logging.Syslog import syslog +import email.Errors + try: True, False except NameError: @@ -52,6 +54,9 @@ class Runner: self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR) self._stop = False + def __repr__(self): + return '<%s at %s>' % (self.__class__.__name__, id(self)) + def stop(self): self._stop = True @@ -88,32 +93,34 @@ class Runner: # available for this qrunner to process. files = self._switchboard.files() for filebase in files: - # Ask the switchboard for the message and metadata objects - # associated with this filebase. - msg, msgdata = self._switchboard.dequeue(filebase) - # It's possible one or both files got lost. If so, just ignore - # this filebase entry. dequeue() will automatically unlink the - # other file, but we should log an error message for diagnostics. - if msg is None or msgdata is None: - syslog('error', 'lost data files for filebase: %s', filebase) - else: - # Now that we've dequeued the message, we want to be - # incredibly anal about making sure that no uncaught exception - # could cause us to lose the message. All runners that - # implement _dispose() must guarantee that exceptions are - # caught and dealt with properly. Still, there may be a bug - # in the infrastructure, and we do not want those to cause - # messages to be lost. Any uncaught exceptions will cause the - # message to be stored in the shunt queue for human + try: + # Ask the switchboard for the message and metadata objects + # associated with this filebase. + msg, msgdata = self._switchboard.dequeue(filebase) + except email.Errors.MessageParseError, e: + # It's possible to get here if the message was stored in the + # pickle in plain text, and the metadata had a _parsemsg key + # that was true, /and/ if the message had some bogosity in + # it. It's almost always going to be spam or bounced spam. + # There's not much we can do (and we didn't even get the + # metadata, so just log the exception and continue. + self._log(e) + syslog('error', 'Ignoring unparseable message: %s', filebase) + continue + try: + self._onefile(msg, msgdata) + except Exception, e: + # All runners that implement _dispose() must guarantee that + # exceptions are caught and dealt with properly. Still, there + # may be a bug in the infrastructure, and we do not want those + # to cause messages to be lost. Any uncaught exceptions will + # cause the message to be stored in the shunt queue for human # intervention. - try: - self._onefile(msg, msgdata) - except Exception, e: - self._log(e) - # Put a marker in the metadata for unshunting - msgdata['whichq'] = self._switchboard.whichq() - filebase = self._shunt.enqueue(msg, msgdata) - syslog('error', 'SHUNTING: %s', filebase) + self._log(e) + # Put a marker in the metadata for unshunting + msgdata['whichq'] = self._switchboard.whichq() + filebase = self._shunt.enqueue(msg, msgdata) + syslog('error', 'SHUNTING: %s', filebase) # Other work we want to do each time through the loop Utils.reap(self._kids, once=True) self._doperiodic() diff --git a/Mailman/Queue/Switchboard.py b/Mailman/Queue/Switchboard.py index bf0d8d409..52c430b0a 100644 --- a/Mailman/Queue/Switchboard.py +++ b/Mailman/Queue/Switchboard.py @@ -1,4 +1,4 @@ -# Copyright (C) 2001-2003 by the Free Software Foundation, Inc. +# Copyright (C) 2001-2004 by the Free Software Foundation, Inc. # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -34,13 +34,12 @@ # needs. import os -import time import sha -import marshal +import time +import email import errno import cPickle - -import email +import marshal from Mailman import mm_cfg from Mailman import Utils @@ -63,7 +62,7 @@ SAVE_MSGS_AS_PICKLES = True -class _Switchboard: +class Switchboard: def __init__(self, whichq, slice=None, numslices=1): self.__whichq = whichq # Create the directory if it doesn't yet exist. @@ -97,11 +96,11 @@ class _Switchboard: # Get some data for the input to the sha hash now = time.time() if SAVE_MSGS_AS_PICKLES and not data.get('_plaintext'): - msgsave = cPickle.dumps(_msg, 1) - ext = '.pck' + protocol = 1 + msgsave = cPickle.dumps(_msg, protocol) else: - msgsave = str(_msg) - ext = '.msg' + protocol = 0 + msgsave = cPickle.dumps(str(_msg), protocol) hashfood = msgsave + listname + `now` # Encode the current time into the file name for FIFO sorting in # files(). The file name consists of two parts separated by a `+': @@ -110,92 +109,46 @@ class _Switchboard: #rcvtime = data.setdefault('received_time', now) rcvtime = data.setdefault('received_time', now) filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest() - # Figure out which queue files the message is to be written to. - msgfile = os.path.join(self.__whichq, filebase + ext) - dbfile = os.path.join(self.__whichq, filebase + '.db') + filename = os.path.join(self.__whichq, filebase + '.pck') + tmpfile = filename + '.tmp' # Always add the metadata schema version number data['version'] = mm_cfg.QFILE_SCHEMA_VERSION # Filter out volatile entries for k in data.keys(): - if k[0] == '_': + if k.startswith('_'): del data[k] - # Now write the message text to one file and the metadata to another - # file. The metadata is always written second to avoid race - # conditions with the various queue runners (which key off of the .db - # filename). + # We have to tell the dequeue() method whether to parse the message + # object or not. + data['_parsemsg'] = (protocol == 0) + # Write to the pickle file the message object and metadata. omask = os.umask(007) # -rw-rw---- try: - msgfp = open(msgfile, 'w') + fp = open(tmpfile, 'w') + try: + fp.write(msgsave) + cPickle.dump(data, fp, protocol) + fp.flush() + os.fsync(fp.fileno()) + finally: + fp.close() finally: os.umask(omask) - msgfp.write(msgsave) - msgfp.close() - # Now write the metadata using the appropriate external metadata - # format. We play rename-switcheroo here to further plug the race - # condition holes. - tmpfile = dbfile + '.tmp' - self._ext_write(tmpfile, data) - os.rename(tmpfile, dbfile) + os.rename(tmpfile, filename) return filebase def dequeue(self, filebase): - # Calculate the .db and .msg filenames from the given filebase. - msgfile = os.path.join(self.__whichq, filebase + '.msg') - pckfile = os.path.join(self.__whichq, filebase + '.pck') - dbfile = os.path.join(self.__whichq, filebase + '.db') - # Now we are going to read the message and metadata for the given - # filebase. We want to read things in this order: first, the metadata - # file to find out whether the message is stored as a pickle or as - # plain text. Second, the actual message file. However, we want to - # first unlink the message file and then the .db file, because the - # qrunner only cues off of the .db file - msg = None - try: - data = self._ext_read(dbfile) - os.unlink(dbfile) - except EnvironmentError, e: - if e.errno <> errno.ENOENT: raise - data = {} - # Between 2.1b4 and 2.1b5, the `rejection-notice' key in the metadata - # was renamed to `rejection_notice', since dashes in the keys are not - # supported in METAFMT_ASCII. - if data.has_key('rejection-notice'): - data['rejection_notice'] = data['rejection-notice'] - del data['rejection-notice'] - msgfp = None + # Calculate the filename from the given filebase. + filename = os.path.join(self.__whichq, filebase + '.pck') + # Read the message object and metadata. + fp = open(filename) + os.unlink(filename) try: - try: - msgfp = open(pckfile) - msg = cPickle.load(msgfp) - os.unlink(pckfile) - except EnvironmentError, e: - if e.errno <> errno.ENOENT: raise - msgfp = None - try: - msgfp = open(msgfile) - msg = email.message_from_file(msgfp, Message.Message) - os.unlink(msgfile) - except EnvironmentError, e: - if e.errno <> errno.ENOENT: raise - except email.Errors.MessageParseError, e: - # This message was unparsable, most likely because its - # MIME encapsulation was broken. For now, there's not - # much we can do about it. - syslog('error', 'message is unparsable: %s', filebase) - msgfp.close() - msgfp = None - if mm_cfg.QRUNNER_SAVE_BAD_MESSAGES: - # Cheapo way to ensure the directory exists w/ the - # proper permissions. - sb = Switchboard(mm_cfg.BADQUEUE_DIR) - os.rename(msgfile, os.path.join( - mm_cfg.BADQUEUE_DIR, filebase + '.txt')) - else: - os.unlink(msgfile) - msg = data = None + msg = cPickle.load(fp) + data = cPickle.load(fp) finally: - if msgfp: - msgfp.close() + fp.close() + if data.get('_parsemsg'): + msg = email.message_from_string(msg, Message.Message) return msg, data def files(self): @@ -203,157 +156,17 @@ class _Switchboard: lower = self.__lower upper = self.__upper for f in os.listdir(self.__whichq): - # We only care about the file's base name (i.e. no extension). - # Thus we'll ignore anything that doesn't end in .db. - if not f.endswith('.db'): + # By ignoring anything that doesn't end in .pck, we ignore + # tempfiles and avoid a race condition. + if not f.endswith('.pck'): continue filebase = os.path.splitext(f)[0] when, digest = filebase.split('+') # Throw out any files which don't match our bitrange. BAW: test # performance and end-cases of this algorithm. - if not lower or (lower <= long(digest, 16) < upper): + if lower is None or (lower <= long(digest, 16) < upper): times[float(when)] = filebase # FIFO sort keys = times.keys() keys.sort() return [times[k] for k in keys] - - def _ext_write(self, tmpfile, data): - raise NotImplementedError - - def _ext_read(self, dbfile): - raise NotImplementedError - - - -class MarshalSwitchboard(_Switchboard): - """Python marshal format.""" - FLOAT_ATTRIBUTES = ['received_time'] - - def _ext_write(self, filename, dict): - omask = os.umask(007) # -rw-rw---- - try: - fp = open(filename, 'w') - finally: - os.umask(omask) - # Python's marshal, up to and including in Python 2.1, has a bug where - # the full precision of floats was not stored. We work around this - # bug by hardcoding a list of float values we know about, repr()-izing - # them ourselves, and doing the reverse conversion on _ext_read(). - for attr in self.FLOAT_ATTRIBUTES: - # We use try/except because we expect a hitrate of nearly 100% - try: - fval = dict[attr] - except KeyError: - pass - else: - dict[attr] = repr(fval) - marshal.dump(dict, fp) - # Make damn sure that the data we just wrote gets flushed to disk - fp.flush() - if mm_cfg.SYNC_AFTER_WRITE: - os.fsync(fp.fileno()) - fp.close() - - def _ext_read(self, filename): - fp = open(filename) - dict = marshal.load(fp) - # Update from version 2 files - if dict.get('version', 0) == 2: - del dict['filebase'] - # Do the reverse conversion (repr -> float) - for attr in self.FLOAT_ATTRIBUTES: - try: - sval = dict[attr] - except KeyError: - pass - else: - # Do a safe eval by setting up a restricted execution - # environment. This may not be strictly necessary since we - # know they are floats, but it can't hurt. - dict[attr] = eval(sval, {'__builtins__': {}}) - fp.close() - return dict - - - -class BSDDBSwitchboard(_Switchboard): - """Native (i.e. compiled-in) Berkeley db format.""" - def _ext_write(self, filename, dict): - import bsddb - omask = os.umask(0) - try: - hashfile = bsddb.hashopen(filename, 'n', 0660) - finally: - os.umask(omask) - # values must be strings - for k, v in dict.items(): - hashfile[k] = marshal.dumps(v) - hashfile.sync() - hashfile.close() - - def _ext_read(self, filename): - import bsddb - dict = {} - hashfile = bsddb.hashopen(filename, 'r') - for k in hashfile.keys(): - dict[k] = marshal.loads(hashfile[k]) - hashfile.close() - return dict - - - -class ASCIISwitchboard(_Switchboard): - """Human readable .db file format. - - key/value pairs are written as - - key = value - - as real Python code which can be execfile'd. - """ - - def _ext_write(self, filename, dict): - omask = os.umask(007) # -rw-rw---- - try: - fp = open(filename, 'w') - finally: - os.umask(omask) - for k, v in dict.items(): - print >> fp, '%s = %s' % (k, repr(v)) - # Make damn sure that the data we just wrote gets flushed to disk - fp.flush() - if mm_cfg.SYNC_AFTER_WRITE: - os.fsync(fp.fileno()) - fp.close() - - def _ext_read(self, filename): - dict = {'__builtins__': {}} - execfile(filename, dict) - del dict['__builtins__'] - return dict - - - -# Here are the various types of external file formats available. The format -# chosen is given defined in the mm_cfg.py configuration file. -if mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_MARSHAL: - Switchboard = MarshalSwitchboard -elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_BSDDB_NATIVE: - Switchboard = BSDDBSwitchboard -elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_ASCII: - Switchboard = ASCIISwitchboard -else: - syslog('error', 'Undefined metadata format: %d (using marshals)', - mm_cfg.METADATA_FORMAT) - Switchboard = MarshalSwitchboard - - - -# For bin/dumpdb -class DumperSwitchboard(Switchboard): - def __init__(self): - pass - - def read(self, filename): - return self._ext_read(filename) |
