summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Mailman/Queue/LMTPRunner.py167
1 files changed, 104 insertions, 63 deletions
diff --git a/Mailman/Queue/LMTPRunner.py b/Mailman/Queue/LMTPRunner.py
index 5ef68ea31..d8bcfcf89 100644
--- a/Mailman/Queue/LMTPRunner.py
+++ b/Mailman/Queue/LMTPRunner.py
@@ -17,20 +17,22 @@
"""Mailman LMTP runner (server).
-FIXME
+Most mail servers can be configured to deliver local messages via 'LMTP'[1].
+This module is actually an LMTP server rather than a standard queue runner.
+Once it enters its main asyncore loop, it does not respond to mailmanctl
+signals the same way as other runners do. All signals will kill this process,
+but the normal mailmanctl watchdog will restart it upon exit.
-Most MTAs can be configured to deliver messages to a `LMTP'[1]. This
-module is actually a server rather than a runner which picks up
-files from queue directory and process as required. LMTP runner opens
-a local TCP port and waits for MTA to connect to it and receives messages.
-The messages are subsequently injected into Mailman's qfiles for processing
-in the normal pipeline.
+The LMTP runner opens a local TCP port and waits for the mail server to
+connect to it. The messages it receives over LMTP are very minimally parsed
+for sanity and if they look okay, they are accepted and injected into
+Mailman's incoming queue for processing through the normal pipeline. If they
+don't look good, or are destined for a bogus sub-queue address, they are
+rejected right away, hopefully so that the peer mail server can provide better
+diagnostics.
-All the incoming messages should be checked by MTA if they have valid
-destination; messages which have invalid destination like non-existent
-listname are discarded and recorded only in error log.
-
-[1] RFC2033
+[1] RFC 2033 Local Mail Transport Protocol
+ http://www.faqs.org/rfcs/rfc2033.html
See the variable USE_LMTP in Defaults.py.in for enabling this delivery
mechanism.
@@ -39,13 +41,12 @@ mechanism.
# NOTE: LMTP delivery is experimental in Mailman 2.2.
import os
+import email
import smtpd
import logging
import asyncore
-from cStringIO import StringIO
-from email.Parser import Parser
-from email.Utils import parseaddr
+from email.utils import parseaddr
from Mailman import Utils
from Mailman.Message import Message
@@ -53,101 +54,141 @@ from Mailman.Queue.Runner import Runner
from Mailman.Queue.sbcache import get_switchboard
from Mailman.configuration import config
-log = logging.getLogger('mailman.error')
+elog = logging.getLogger('mailman.error')
+qlog = logging.getLogger('mailman.qrunner')
# We only care about the listname and the subq as in listname@ or
# listname-request@
-subqnames = ('admin','bounces','confirm','join','leave',
- 'owner','request','subscribe','unsubscribe')
+subqnames = (
+ 'bounces', 'confirm', 'join', ' leave',
+ 'owner', 'request', 'subscribe', 'unsubscribe',
+ )
+
+DASH = '-'
+CRLF = '\r\n'
+ERR_451 = '451 Requested action aborted: error in processing'
+ERR_502 = '502 Error: command HELO not implemented'
+ERR_550 = '550 Requested action not taken: mailbox unavailable'
+
+# XXX Blech
+smtpd.__version__ = 'Python LMTP queue runner 1.0'
+
+
def getlistq(address):
localpart, domain = address.split('@', 1)
- l = localpart.split('-')
+ l = localpart.split(DASH)
if l[-1] in subqnames:
- listname = '-'.join(l[:-1])
+ listname = DASH.join(l[:-1])
subq = l[-1]
else:
listname = localpart
subq = None
return listname, subq, domain
+
class SMTPChannel(smtpd.SMTPChannel):
- # Override smtpd.SMTPChannel but can't change the class name.
+ # Override smtpd.SMTPChannel don't can't change the class name so that we
+ # don't have to reverse engineer Python's name mangling scheme.
+ #
# LMTP greeting is LHLO and no HELO/EHLO
def smtp_LHLO(self, arg):
smtpd.SMTPChannel.smtp_HELO(self, arg)
def smtp_HELO(self, arg):
- self.push('502 Error: command HELO not implemented')
+ self.push(ERR_502)
+
class LMTPRunner(Runner, smtpd.SMTPServer):
- # Only __init__ is called on startup. Asyncore is responsible for
- # later connections from MTA.
-
- def __init__(self, slice=None, numslices=1,
- localaddr=(config.LMTP_HOST, config.LMTP_PORT),
- remoteaddr=None):
- self._stop = 0
- self._parser = Parser(Message)
- smtpd.SMTPServer.__init__(self, localaddr, remoteaddr)
+ # Only __init__ is called on startup. Asyncore is responsible for later
+ # connections from MTA. slice and numslices are ignored and are
+ # necessary only to satisfy the API.
+ def __init__(self, slice=None, numslices=1):
+ localaddr = config.LMTP_HOST, config.LMTP_PORT
+ # Do not call Runner's constructor because there's no QDIR to create
+ smtpd.SMTPServer.__init__(self, localaddr, remoteaddr=None)
def handle_accept(self):
conn, addr = self.accept()
- channel = SMTPChannel(self, conn, addr)
+ channel = SMTPChannel(self, conn, addr)
def process_message(self, peer, mailfrom, rcpttos, data):
- # Refresh this each time through the list.
- listnames = Utils.list_names()
- fp = StringIO(data)
- for to in rcpttos:
- try:
- to = parseaddr(to)[1].lower()
- listname, subq, domain = getlistq(to)
- listname = listname + '@' + domain
- if listname not in listnames:
- raise Mailman.Errors.MMUnknownListError, listname
- fp.seek(0)
- msg = self._parser.parse(fp)
- msg['X-MailFrom'] = mailfrom
- msgdata = {'listname': listname}
- if subq in ('bounces', 'admin'):
- queue = get_switchboard(config.BOUNCEQUEUE_DIR)
- elif subq == 'confirm':
- msgdata['toconfirm'] = 1
+ try:
+ # Refresh the list of list names every time we process a message
+ # since the set of mailing lists could have changed. However, on
+ # a big site this could be fairly expensive, so we may need to
+ # cache this in some way.
+ listnames = Utils.list_names()
+ # Parse the message data. XXX Should we reject the message
+ # immediately if it has defects? Usually only spam has defects.
+ msg = email.message_from_string(data, Message)
+ msg['X-MailFrom'] = mailfrom
+ except Exception, e:
+ elog.error('%s', e)
+ return CRLF.join([ERR_451 for to in rcpttos])
+ # RFC 2033 requires us to return a status code for every recipient.
+ status = []
+ # Now for each address in the recipients, parse the address to first
+ # see if it's destined for a valid mailing list. If so, then queue
+ # the message to the appropriate place and record a 250 status for
+ # that recipient. If not, record a failure status for that recipient.
+ for to in rcpttos:
+ try:
+ to = parseaddr(to)[1].lower()
+ listname, subq, domain = getlistq(to)
+ listname += '@' + domain
+ if listname not in listnames:
+ status.append(ERR_550)
+ continue
+ # The recipient is a valid mailing list; see if it's a valid
+ # sub-queue, and if so, enqueue it.
+ msgdata = dict(listname=listname)
+ if subq in ('bounces', 'admin'):
+ queue = get_switchboard(config.BOUNCEQUEUE_DIR)
+ elif subq == 'confirm':
+ msgdata['toconfirm'] = True
queue = get_switchboard(config.CMDQUEUE_DIR)
elif subq in ('join', 'subscribe'):
- msgdata['tojoin'] = 1
+ msgdata['tojoin'] = True
queue = get_switchboard(config.CMDQUEUE_DIR)
elif subq in ('leave', 'unsubscribe'):
- msgdata['toleave'] = 1
+ msgdata['toleave'] = True
queue = get_switchboard(config.CMDQUEUE_DIR)
elif subq == 'owner':
msgdata.update({
- 'toowner': True,
- 'envsender': config.SITE_OWNER_ADDRESS,
- 'pipeline': config.OWNER_PIPELINE,
+ 'toowner' : True,
+ 'envsender' : config.SITE_OWNER_ADDRESS,
+ 'pipeline' : config.OWNER_PIPELINE,
})
queue = get_switchboard(config.INQUEUE_DIR)
elif subq is None:
- msgdata['tolist'] = 1
- queue = get_switchboard(config.INQUEUE_DIR)
+ msgdata['tolist'] = True
+ queue = get_switchboard(config.INQUEUE_DIR)
elif subq == 'request':
- msgdata['torequest'] = 1
- queue = get_switchboard(config.CMDQUEUE_DIR)
- else:
- log.error('Unknown sub-queue: %s', subq)
+ msgdata['torequest'] = True
+ queue = get_switchboard(config.CMDQUEUE_DIR)
+ else:
+ elog.error('Unknown sub-queue: %s', subq)
+ status.append(ERR_550)
continue
queue.enqueue(msg, msgdata)
+ status.append('250 Ok')
except Exception, e:
- log.error('%s', e)
+ elog.error('%s', e)
+ status.append(ERR_550)
+ # All done; returning this big status string should give the expected
+ # response to the LMTP client.
+ return CRLF.join(status)
def _cleanup(self):
pass
server = LMTPRunner()
+qlog.info('LMTPRunner qrunner started.')
asyncore.loop()
-
+# We'll never get here, but just in case...
+qlog.info('LMTPRunner qrunner exiting.')