# Copyright (C) 1998-2011 by the Free Software Foundation, Inc. # # This file is part of GNU Mailman. # # GNU Mailman is free software: you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) # any later version. # # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more details. # # You should have received a copy of the GNU General Public License along with # GNU Mailman. If not, see . """Parse bounce messages generated by Postfix. This also matches something called 'Keftamail' which looks just like Postfix bounces with the word Postfix scratched out and the word 'Keftamail' written in in crayon. It also matches something claiming to be 'The BNS Postfix program', and 'SMTP_Gateway'. Everybody's gotta be different, huh? """ from __future__ import absolute_import, unicode_literals __metaclass__ = type __all__ = [ 'Postfix', ] import re from cStringIO import StringIO from flufl.enum import Enum from zope.interface import implements from mailman.interfaces.bounce import IBounceDetector # Are these heuristics correct or guaranteed? pcre = re.compile(r'[ \t]*the\s*(bns)?\s*(postfix|keftamail|smtp_gateway)', re.IGNORECASE) rcre = re.compile(r'failure reason:$', re.IGNORECASE) acre = re.compile(r'<(?P[^>]*)>:') REPORT_TYPES = ('multipart/mixed', 'multipart/report') class ParseState(Enum): start = 0 salutation_found = 1 def flatten(msg, leaves): # Give us all the leaf (non-multipart) subparts. if msg.is_multipart(): for part in msg.get_payload(): flatten(part, leaves) else: leaves.append(msg) def findaddr(msg): addresses = set() body = StringIO(msg.get_payload()) state = ParseState.start for line in body: # Preserve leading whitespace. line = line.rstrip() # Yes, use match() to match at beginning of string. if state is ParseState.start and ( pcre.match(line) or rcre.match(line)): # Then... state = ParseState.salutation_found elif state is ParseState.salutation_found and line: mo = acre.search(line) if mo: addresses.add(mo.group('addr')) # Probably a continuation line. return addresses class Postfix: """Parse bounce messages generated by Postfix.""" implements(IBounceDetector) def process(self, msg): """See `IBounceDetector`.""" if msg.get_content_type() not in REPORT_TYPES: return None # We're looking for the plain/text subpart with a Content-Description: # of 'notification'. leaves = [] flatten(msg, leaves) for subpart in leaves: content_type = subpart.get_content_type() content_desc = subpart.get('content-description', '').lower() if content_type == 'text/plain' and content_desc == 'notification': return set(findaddr(subpart)) return None