diff options
| -rw-r--r-- | Mailman/Queue/LMTPRunner.py | 167 |
1 files changed, 104 insertions, 63 deletions
diff --git a/Mailman/Queue/LMTPRunner.py b/Mailman/Queue/LMTPRunner.py index 5ef68ea31..d8bcfcf89 100644 --- a/Mailman/Queue/LMTPRunner.py +++ b/Mailman/Queue/LMTPRunner.py @@ -17,20 +17,22 @@ """Mailman LMTP runner (server). -FIXME +Most mail servers can be configured to deliver local messages via 'LMTP'[1]. +This module is actually an LMTP server rather than a standard queue runner. +Once it enters its main asyncore loop, it does not respond to mailmanctl +signals the same way as other runners do. All signals will kill this process, +but the normal mailmanctl watchdog will restart it upon exit. -Most MTAs can be configured to deliver messages to a `LMTP'[1]. This -module is actually a server rather than a runner which picks up -files from queue directory and process as required. LMTP runner opens -a local TCP port and waits for MTA to connect to it and receives messages. -The messages are subsequently injected into Mailman's qfiles for processing -in the normal pipeline. +The LMTP runner opens a local TCP port and waits for the mail server to +connect to it. The messages it receives over LMTP are very minimally parsed +for sanity and if they look okay, they are accepted and injected into +Mailman's incoming queue for processing through the normal pipeline. If they +don't look good, or are destined for a bogus sub-queue address, they are +rejected right away, hopefully so that the peer mail server can provide better +diagnostics. -All the incoming messages should be checked by MTA if they have valid -destination; messages which have invalid destination like non-existent -listname are discarded and recorded only in error log. - -[1] RFC2033 +[1] RFC 2033 Local Mail Transport Protocol + http://www.faqs.org/rfcs/rfc2033.html See the variable USE_LMTP in Defaults.py.in for enabling this delivery mechanism. @@ -39,13 +41,12 @@ mechanism. # NOTE: LMTP delivery is experimental in Mailman 2.2. import os +import email import smtpd import logging import asyncore -from cStringIO import StringIO -from email.Parser import Parser -from email.Utils import parseaddr +from email.utils import parseaddr from Mailman import Utils from Mailman.Message import Message @@ -53,101 +54,141 @@ from Mailman.Queue.Runner import Runner from Mailman.Queue.sbcache import get_switchboard from Mailman.configuration import config -log = logging.getLogger('mailman.error') +elog = logging.getLogger('mailman.error') +qlog = logging.getLogger('mailman.qrunner') # We only care about the listname and the subq as in listname@ or # listname-request@ -subqnames = ('admin','bounces','confirm','join','leave', - 'owner','request','subscribe','unsubscribe') +subqnames = ( + 'bounces', 'confirm', 'join', ' leave', + 'owner', 'request', 'subscribe', 'unsubscribe', + ) + +DASH = '-' +CRLF = '\r\n' +ERR_451 = '451 Requested action aborted: error in processing' +ERR_502 = '502 Error: command HELO not implemented' +ERR_550 = '550 Requested action not taken: mailbox unavailable' + +# XXX Blech +smtpd.__version__ = 'Python LMTP queue runner 1.0' + + def getlistq(address): localpart, domain = address.split('@', 1) - l = localpart.split('-') + l = localpart.split(DASH) if l[-1] in subqnames: - listname = '-'.join(l[:-1]) + listname = DASH.join(l[:-1]) subq = l[-1] else: listname = localpart subq = None return listname, subq, domain + class SMTPChannel(smtpd.SMTPChannel): - # Override smtpd.SMTPChannel but can't change the class name. + # Override smtpd.SMTPChannel don't can't change the class name so that we + # don't have to reverse engineer Python's name mangling scheme. + # # LMTP greeting is LHLO and no HELO/EHLO def smtp_LHLO(self, arg): smtpd.SMTPChannel.smtp_HELO(self, arg) def smtp_HELO(self, arg): - self.push('502 Error: command HELO not implemented') + self.push(ERR_502) + class LMTPRunner(Runner, smtpd.SMTPServer): - # Only __init__ is called on startup. Asyncore is responsible for - # later connections from MTA. - - def __init__(self, slice=None, numslices=1, - localaddr=(config.LMTP_HOST, config.LMTP_PORT), - remoteaddr=None): - self._stop = 0 - self._parser = Parser(Message) - smtpd.SMTPServer.__init__(self, localaddr, remoteaddr) + # Only __init__ is called on startup. Asyncore is responsible for later + # connections from MTA. slice and numslices are ignored and are + # necessary only to satisfy the API. + def __init__(self, slice=None, numslices=1): + localaddr = config.LMTP_HOST, config.LMTP_PORT + # Do not call Runner's constructor because there's no QDIR to create + smtpd.SMTPServer.__init__(self, localaddr, remoteaddr=None) def handle_accept(self): conn, addr = self.accept() - channel = SMTPChannel(self, conn, addr) + channel = SMTPChannel(self, conn, addr) def process_message(self, peer, mailfrom, rcpttos, data): - # Refresh this each time through the list. - listnames = Utils.list_names() - fp = StringIO(data) - for to in rcpttos: - try: - to = parseaddr(to)[1].lower() - listname, subq, domain = getlistq(to) - listname = listname + '@' + domain - if listname not in listnames: - raise Mailman.Errors.MMUnknownListError, listname - fp.seek(0) - msg = self._parser.parse(fp) - msg['X-MailFrom'] = mailfrom - msgdata = {'listname': listname} - if subq in ('bounces', 'admin'): - queue = get_switchboard(config.BOUNCEQUEUE_DIR) - elif subq == 'confirm': - msgdata['toconfirm'] = 1 + try: + # Refresh the list of list names every time we process a message + # since the set of mailing lists could have changed. However, on + # a big site this could be fairly expensive, so we may need to + # cache this in some way. + listnames = Utils.list_names() + # Parse the message data. XXX Should we reject the message + # immediately if it has defects? Usually only spam has defects. + msg = email.message_from_string(data, Message) + msg['X-MailFrom'] = mailfrom + except Exception, e: + elog.error('%s', e) + return CRLF.join([ERR_451 for to in rcpttos]) + # RFC 2033 requires us to return a status code for every recipient. + status = [] + # Now for each address in the recipients, parse the address to first + # see if it's destined for a valid mailing list. If so, then queue + # the message to the appropriate place and record a 250 status for + # that recipient. If not, record a failure status for that recipient. + for to in rcpttos: + try: + to = parseaddr(to)[1].lower() + listname, subq, domain = getlistq(to) + listname += '@' + domain + if listname not in listnames: + status.append(ERR_550) + continue + # The recipient is a valid mailing list; see if it's a valid + # sub-queue, and if so, enqueue it. + msgdata = dict(listname=listname) + if subq in ('bounces', 'admin'): + queue = get_switchboard(config.BOUNCEQUEUE_DIR) + elif subq == 'confirm': + msgdata['toconfirm'] = True queue = get_switchboard(config.CMDQUEUE_DIR) elif subq in ('join', 'subscribe'): - msgdata['tojoin'] = 1 + msgdata['tojoin'] = True queue = get_switchboard(config.CMDQUEUE_DIR) elif subq in ('leave', 'unsubscribe'): - msgdata['toleave'] = 1 + msgdata['toleave'] = True queue = get_switchboard(config.CMDQUEUE_DIR) elif subq == 'owner': msgdata.update({ - 'toowner': True, - 'envsender': config.SITE_OWNER_ADDRESS, - 'pipeline': config.OWNER_PIPELINE, + 'toowner' : True, + 'envsender' : config.SITE_OWNER_ADDRESS, + 'pipeline' : config.OWNER_PIPELINE, }) queue = get_switchboard(config.INQUEUE_DIR) elif subq is None: - msgdata['tolist'] = 1 - queue = get_switchboard(config.INQUEUE_DIR) + msgdata['tolist'] = True + queue = get_switchboard(config.INQUEUE_DIR) elif subq == 'request': - msgdata['torequest'] = 1 - queue = get_switchboard(config.CMDQUEUE_DIR) - else: - log.error('Unknown sub-queue: %s', subq) + msgdata['torequest'] = True + queue = get_switchboard(config.CMDQUEUE_DIR) + else: + elog.error('Unknown sub-queue: %s', subq) + status.append(ERR_550) continue queue.enqueue(msg, msgdata) + status.append('250 Ok') except Exception, e: - log.error('%s', e) + elog.error('%s', e) + status.append(ERR_550) + # All done; returning this big status string should give the expected + # response to the LMTP client. + return CRLF.join(status) def _cleanup(self): pass server = LMTPRunner() +qlog.info('LMTPRunner qrunner started.') asyncore.loop() - +# We'll never get here, but just in case... +qlog.info('LMTPRunner qrunner exiting.') |
