summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cron/qrunner268
1 files changed, 22 insertions, 246 deletions
diff --git a/cron/qrunner b/cron/qrunner
index ec89c10c2..c42e10b71 100644
--- a/cron/qrunner
+++ b/cron/qrunner
@@ -16,271 +16,47 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-"""Deliver queued messages.
+"""Process queued messages.
"""
-# A typical Mailman list exposes four aliases which point to three different
-# wrapped scripts. E.g. for a list named `mylist', you'd have:
-#
-# mylist -> post
-# mylist-admin -> mailowner
-# mylist-request -> mailcmd
-# mylist-owner -> mailowner (through an alias to mylist-admin)
-#
-# Only 3 scripts are used for historical purposes, and this is unlikely to
-# change to due backwards compatibility. That's immaterial though since the
-# mailowner script can determine which alias it received the message on.
-#
-# mylist-request is a robot address; it's sole purpose is to process emailed
-# commands in a Majordomo-like fashion. mylist-admin is supposed to reach the
-# list owners, but it performs one vital step before list owner delivery - it
-# examines the message for bounce content. mylist-owner is the fallback for
-# delivery to the list owners; it performs no bounce detection, but it /does/
-# look for bounce loops, which can occur if a list owner address is bouncing.
-#
-# So delivery flow of messages look like this:
-#
-# joerandom ---> mylist ---> list members
-# | |
-# | |[bounces]
-# +-------> mylist-admin <----+ <-------------------------------+
-# | | |
-# | +--->[internal bounce processing] |
-# | | |
-# | | [bounce found] |
-# | +--->[register and discard] |
-# | | |
-# | | [no bounce found] |
-# | +---> list owners <------+ |
-# | | | |
-# | |[bounces] | |
-# +-------> mylist-owner <-------------------+ | |
-# | | | |
-# | | [bounce loop detected] | |
-# | +---> [log and discard] | |
-# | | | |
-# | +-----------------------------------------+ |
-# | [no bounce loop detected] |
-# | |
-# | |
-# +-------> mylist-request |
-# | |
-# +---> [command processor] |
-# | |
-# +---> joerandom |
-# | |
-# |[bounces] |
-# +----------------------+
-
-import sys
import os
-import string
-import time
-import marshal
-import mimetools
import paths
from Mailman import mm_cfg
from Mailman import Utils
-from Mailman import MailList
-from Mailman import LockFile
-from Mailman import Message
-from Mailman import Errors
-from Mailman.Handlers import HandlerAPI
-from Mailman.Bouncers import BouncerAPI
-from Mailman.Logging.Syslog import syslog
+from Mailman.Queue import IncomingRunner
+from Mailman.Queue import OutgoingRunner
+from Mailman.Queue import NewsRunner
from Mailman.Logging.Utils import LogStdErr
-from Mailman.pythonlib.StringIO import StringIO
# Work around known problems with some RedHat cron daemons
import signal
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
-QRUNNER_LOCK_FILE = os.path.join(mm_cfg.LOCK_DIR, 'qrunner.lock')
-
LogStdErr('error', 'qrunner', manual_reprime=0, tee_to_stdout=0)
-
-
-def dispose_message(mlist, msg, msgdata):
- # The message may be destined for one of three subsystems: the list
- # delivery subsystem (i.e. the message gets delivered to every member of
- # the list), the bounce detector (i.e. this was a message to the -owner
- # address), or the mail command handler (i.e. this was a message to the
- # -request address). The flags controlling this path are found in the
- # message data, as queued by the post, mailowner, and mailcmd scripts
- # respectively:
- #
- # post - no `toadmin' or `torequest' key
- # mailowner - `toadmin' == true
- # mailcmd - `torequest' == true
- if msgdata.get('toadmin', 0):
- s = StringIO(str(msg))
- mimemsg = mimetools.Message(s)
- if mlist.bounce_processing:
- if BouncerAPI.ScanMessages(mlist, mimemsg):
- return 0
- # Either bounce processing isn't turned on or the bounce detector
- # found no recognized bounce format in the message. In either case,
- # forward the dang thing off to the list owners. Be sure to munge the
- # headers so that any bounces from the list owners goes to the -owner
- # address instead of the -admin address. This will avoid bounce
- # loops.
- msgdata.update({'recips' : mlist.owner[:],
- 'errorsto': mlist.GetOwnerEmail(),
- 'noack' : 0, # enable Replybot
- })
- return HandlerAPI.DeliverToUser(mlist, msg, msgdata)
- elif msgdata.get('toowner', 0):
- # The message could have been a bounce from a broken list admin
- # address. About the only other test we can do is to see if the
- # message is appearing to come from a well-known MTA generated
- # address.
- sender = msg.GetSender()
- i = string.find(sender, '@')
- if i >= 0:
- senderlhs = string.lower(sender[:i])
- else:
- senderlhs = sender
- if senderlhs in mm_cfg.LIKELY_BOUNCE_SENDERS:
- syslog('error', 'bounce loop detected from: %s' % sender)
- return 0
- # Any messages to the owner address must have Errors-To: set back to
- # the owners address so bounce loops can be broken, as per the code
- # above.
- msgdata.update({'recips' : mlist.owner[:],
- 'errorsto': mlist.GetOwnerEmail(),
- 'noack' : 0, # enable Replybot
- })
- return HandlerAPI.DeliverToUser(mlist, msg, msgdata)
- elif msgdata.get('torequest', 0):
- mlist.ParseMailCommands(msg)
- return 0
- else:
- # Pre 2.0beta3 qfiles have no schema version number
- version = msgdata.get('version', 0)
- if version < 1:
- return HandlerAPI.RedeliverMessage(mlist, msg)
- return HandlerAPI.DeliverToList(mlist, msg, msgdata)
-
-
-
-_listcache = {}
-def open_list(listname):
- global _listcache
- mlist = _listcache.get(listname)
- if not mlist:
- try:
- mlist = MailList.MailList(listname, lock=0)
- _listcache[listname] = mlist
- except Errors.MMListError, e:
- syslog('qrunner', 'error opening list: %s\n%s' % (listname, e))
- return mlist
+QRUNNERS = [IncomingRunner.IncomingRunner,
+ OutgoingRunner.OutgoingRunner,
+ NewsRunner.NewsRunner,
+ ]
-def dequeue(root):
- # We're done with this message
- os.unlink(root + '.db')
- os.unlink(root + '.msg')
-
-
-
-def main(lock):
- t0 = time.time()
- msgcount = 0
- allkids = {}
- for file in os.listdir(mm_cfg.QUEUE_DIR):
- # Have we exceeded either resource limit?
- if mm_cfg.QRUNNER_PROCESS_LIFETIME is not None and \
- (time.time() - t0) > mm_cfg.QRUNNER_PROCESS_LIFETIME:
- break
- if mm_cfg.QRUNNER_MAX_MESSAGES is not None and \
- msgcount > mm_cfg.QRUNNER_MAX_MESSAGES:
- break
- msgcount = msgcount + 1
- # Keep the qrunner lock alive for a while longer
- lock.refresh()
- root, ext = os.path.splitext(os.path.join(mm_cfg.QUEUE_DIR, file))
- if ext <> '.msg':
- # trigger just off the .msg file
- continue
- msgfp = dbfp = None
- try:
- dbfp = open(root + '.db')
- msgdata = marshal.load(dbfp)
- dbfp.close()
- dbfp = None
- msgfp = open(root + '.msg')
- # re-establish the file base for re-queuing
- msg = Message.Message(msgfp, filebase=msgdata.get('filebase'))
- msgfp.close()
- msgfp = None
- except (EOFError, ValueError, TypeError, IOError), e:
- # For some reason we had trouble getting all the information out
- # of the queued files. log this and move on (we figure it's a
- # temporary problem)
- syslog('qrunner', 'Exception reading qfile: %s\n%s' % (root, e))
- if msgfp:
- msgfp.close()
- if dbfp:
- dbfp.close()
- continue
- # Dispose of it, after ensuring that we've got the lock on the list.
- listname = msgdata.get('listname')
- if not listname:
- syslog('qrunner', 'qfile metadata specifies no list: %s' % root)
- continue
- mlist = open_list(listname)
- if not mlist:
- syslog('qrunner',
- 'Dequeuing message destined for missing list: %s' % root)
- dequeue(root)
- continue
- # Now try to get the list lock
- try:
- mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
- except LockFile.TimeOutError:
- # oh well, try again later
- continue
- try:
- keepqueued = dispose_message(mlist, msg, msgdata)
- # Did the delivery generate child processes? Don't store them in
- # the message data files.
- kids = msgdata.get('_kids')
- if kids:
- allkids.update(kids)
- del msgdata['_kids']
- if not keepqueued:
- # We're done with this message
- dequeue(root)
- finally:
- mlist.Save()
- mlist.Unlock()
- return allkids
+def main():
+ # run each qrunner in their own processes
+ kids = {}
+ for qrclass in QRUNNERS:
+ pid = os.fork()
+ if pid:
+ # parent
+ kids[pid] = pid
+ else:
+ # child
+ qrclass().run()
+ os._exit(0)
+ Utils.reap(kids)
if __name__ == '__main__':
- global _listcache
-## syslog('qrunner', 'qrunner begining')
- # first, claim the queue runner lock
- lock = LockFile.LockFile(QRUNNER_LOCK_FILE,
- lifetime=mm_cfg.QRUNNER_LOCK_LIFETIME)
- try:
- lock.lock(timeout=0.5)
- except LockFile.TimeOutError:
- # Some other qrunner process is running, which is fine.
- syslog('qrunner', 'Could not acquire qrunner lock')
- else:
- kids = {}
- try:
- kids = main(lock)
- finally:
- lock.unlock(unconditionally=1)
- # Clear the global cache to be clean about it. Also, we can reap
- # any child processes that were created during the delivery
- # (e.g. from ToUsenet).
- _listcache.clear()
- Utils.reap(kids)
-## syslog('qrunner', 'qrunner ended')
+ main()