diff options
| -rw-r--r-- | cron/qrunner | 268 |
1 files changed, 22 insertions, 246 deletions
diff --git a/cron/qrunner b/cron/qrunner index ec89c10c2..c42e10b71 100644 --- a/cron/qrunner +++ b/cron/qrunner @@ -16,271 +16,47 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -"""Deliver queued messages. +"""Process queued messages. """ -# A typical Mailman list exposes four aliases which point to three different -# wrapped scripts. E.g. for a list named `mylist', you'd have: -# -# mylist -> post -# mylist-admin -> mailowner -# mylist-request -> mailcmd -# mylist-owner -> mailowner (through an alias to mylist-admin) -# -# Only 3 scripts are used for historical purposes, and this is unlikely to -# change to due backwards compatibility. That's immaterial though since the -# mailowner script can determine which alias it received the message on. -# -# mylist-request is a robot address; it's sole purpose is to process emailed -# commands in a Majordomo-like fashion. mylist-admin is supposed to reach the -# list owners, but it performs one vital step before list owner delivery - it -# examines the message for bounce content. mylist-owner is the fallback for -# delivery to the list owners; it performs no bounce detection, but it /does/ -# look for bounce loops, which can occur if a list owner address is bouncing. -# -# So delivery flow of messages look like this: -# -# joerandom ---> mylist ---> list members -# | | -# | |[bounces] -# +-------> mylist-admin <----+ <-------------------------------+ -# | | | -# | +--->[internal bounce processing] | -# | | | -# | | [bounce found] | -# | +--->[register and discard] | -# | | | -# | | [no bounce found] | -# | +---> list owners <------+ | -# | | | | -# | |[bounces] | | -# +-------> mylist-owner <-------------------+ | | -# | | | | -# | | [bounce loop detected] | | -# | +---> [log and discard] | | -# | | | | -# | +-----------------------------------------+ | -# | [no bounce loop detected] | -# | | -# | | -# +-------> mylist-request | -# | | -# +---> [command processor] | -# | | -# +---> joerandom | -# | | -# |[bounces] | -# +----------------------+ - -import sys import os -import string -import time -import marshal -import mimetools import paths from Mailman import mm_cfg from Mailman import Utils -from Mailman import MailList -from Mailman import LockFile -from Mailman import Message -from Mailman import Errors -from Mailman.Handlers import HandlerAPI -from Mailman.Bouncers import BouncerAPI -from Mailman.Logging.Syslog import syslog +from Mailman.Queue import IncomingRunner +from Mailman.Queue import OutgoingRunner +from Mailman.Queue import NewsRunner from Mailman.Logging.Utils import LogStdErr -from Mailman.pythonlib.StringIO import StringIO # Work around known problems with some RedHat cron daemons import signal signal.signal(signal.SIGCHLD, signal.SIG_DFL) -QRUNNER_LOCK_FILE = os.path.join(mm_cfg.LOCK_DIR, 'qrunner.lock') - LogStdErr('error', 'qrunner', manual_reprime=0, tee_to_stdout=0) - - -def dispose_message(mlist, msg, msgdata): - # The message may be destined for one of three subsystems: the list - # delivery subsystem (i.e. the message gets delivered to every member of - # the list), the bounce detector (i.e. this was a message to the -owner - # address), or the mail command handler (i.e. this was a message to the - # -request address). The flags controlling this path are found in the - # message data, as queued by the post, mailowner, and mailcmd scripts - # respectively: - # - # post - no `toadmin' or `torequest' key - # mailowner - `toadmin' == true - # mailcmd - `torequest' == true - if msgdata.get('toadmin', 0): - s = StringIO(str(msg)) - mimemsg = mimetools.Message(s) - if mlist.bounce_processing: - if BouncerAPI.ScanMessages(mlist, mimemsg): - return 0 - # Either bounce processing isn't turned on or the bounce detector - # found no recognized bounce format in the message. In either case, - # forward the dang thing off to the list owners. Be sure to munge the - # headers so that any bounces from the list owners goes to the -owner - # address instead of the -admin address. This will avoid bounce - # loops. - msgdata.update({'recips' : mlist.owner[:], - 'errorsto': mlist.GetOwnerEmail(), - 'noack' : 0, # enable Replybot - }) - return HandlerAPI.DeliverToUser(mlist, msg, msgdata) - elif msgdata.get('toowner', 0): - # The message could have been a bounce from a broken list admin - # address. About the only other test we can do is to see if the - # message is appearing to come from a well-known MTA generated - # address. - sender = msg.GetSender() - i = string.find(sender, '@') - if i >= 0: - senderlhs = string.lower(sender[:i]) - else: - senderlhs = sender - if senderlhs in mm_cfg.LIKELY_BOUNCE_SENDERS: - syslog('error', 'bounce loop detected from: %s' % sender) - return 0 - # Any messages to the owner address must have Errors-To: set back to - # the owners address so bounce loops can be broken, as per the code - # above. - msgdata.update({'recips' : mlist.owner[:], - 'errorsto': mlist.GetOwnerEmail(), - 'noack' : 0, # enable Replybot - }) - return HandlerAPI.DeliverToUser(mlist, msg, msgdata) - elif msgdata.get('torequest', 0): - mlist.ParseMailCommands(msg) - return 0 - else: - # Pre 2.0beta3 qfiles have no schema version number - version = msgdata.get('version', 0) - if version < 1: - return HandlerAPI.RedeliverMessage(mlist, msg) - return HandlerAPI.DeliverToList(mlist, msg, msgdata) - - - -_listcache = {} -def open_list(listname): - global _listcache - mlist = _listcache.get(listname) - if not mlist: - try: - mlist = MailList.MailList(listname, lock=0) - _listcache[listname] = mlist - except Errors.MMListError, e: - syslog('qrunner', 'error opening list: %s\n%s' % (listname, e)) - return mlist +QRUNNERS = [IncomingRunner.IncomingRunner, + OutgoingRunner.OutgoingRunner, + NewsRunner.NewsRunner, + ] -def dequeue(root): - # We're done with this message - os.unlink(root + '.db') - os.unlink(root + '.msg') - - - -def main(lock): - t0 = time.time() - msgcount = 0 - allkids = {} - for file in os.listdir(mm_cfg.QUEUE_DIR): - # Have we exceeded either resource limit? - if mm_cfg.QRUNNER_PROCESS_LIFETIME is not None and \ - (time.time() - t0) > mm_cfg.QRUNNER_PROCESS_LIFETIME: - break - if mm_cfg.QRUNNER_MAX_MESSAGES is not None and \ - msgcount > mm_cfg.QRUNNER_MAX_MESSAGES: - break - msgcount = msgcount + 1 - # Keep the qrunner lock alive for a while longer - lock.refresh() - root, ext = os.path.splitext(os.path.join(mm_cfg.QUEUE_DIR, file)) - if ext <> '.msg': - # trigger just off the .msg file - continue - msgfp = dbfp = None - try: - dbfp = open(root + '.db') - msgdata = marshal.load(dbfp) - dbfp.close() - dbfp = None - msgfp = open(root + '.msg') - # re-establish the file base for re-queuing - msg = Message.Message(msgfp, filebase=msgdata.get('filebase')) - msgfp.close() - msgfp = None - except (EOFError, ValueError, TypeError, IOError), e: - # For some reason we had trouble getting all the information out - # of the queued files. log this and move on (we figure it's a - # temporary problem) - syslog('qrunner', 'Exception reading qfile: %s\n%s' % (root, e)) - if msgfp: - msgfp.close() - if dbfp: - dbfp.close() - continue - # Dispose of it, after ensuring that we've got the lock on the list. - listname = msgdata.get('listname') - if not listname: - syslog('qrunner', 'qfile metadata specifies no list: %s' % root) - continue - mlist = open_list(listname) - if not mlist: - syslog('qrunner', - 'Dequeuing message destined for missing list: %s' % root) - dequeue(root) - continue - # Now try to get the list lock - try: - mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT) - except LockFile.TimeOutError: - # oh well, try again later - continue - try: - keepqueued = dispose_message(mlist, msg, msgdata) - # Did the delivery generate child processes? Don't store them in - # the message data files. - kids = msgdata.get('_kids') - if kids: - allkids.update(kids) - del msgdata['_kids'] - if not keepqueued: - # We're done with this message - dequeue(root) - finally: - mlist.Save() - mlist.Unlock() - return allkids +def main(): + # run each qrunner in their own processes + kids = {} + for qrclass in QRUNNERS: + pid = os.fork() + if pid: + # parent + kids[pid] = pid + else: + # child + qrclass().run() + os._exit(0) + Utils.reap(kids) if __name__ == '__main__': - global _listcache -## syslog('qrunner', 'qrunner begining') - # first, claim the queue runner lock - lock = LockFile.LockFile(QRUNNER_LOCK_FILE, - lifetime=mm_cfg.QRUNNER_LOCK_LIFETIME) - try: - lock.lock(timeout=0.5) - except LockFile.TimeOutError: - # Some other qrunner process is running, which is fine. - syslog('qrunner', 'Could not acquire qrunner lock') - else: - kids = {} - try: - kids = main(lock) - finally: - lock.unlock(unconditionally=1) - # Clear the global cache to be clean about it. Also, we can reap - # any child processes that were created during the delivery - # (e.g. from ToUsenet). - _listcache.clear() - Utils.reap(kids) -## syslog('qrunner', 'qrunner ended') + main() |
