summaryrefslogtreecommitdiff
path: root/cron
diff options
context:
space:
mode:
authorbwarsaw2000-12-20 20:22:00 +0000
committerbwarsaw2000-12-20 20:22:00 +0000
commit148e76e9564ac7e627b96f68a7c0ff1f6c8005a0 (patch)
tree1177a3ab72d78b742ed5d10a6373f01558d259c0 /cron
parent8375429d60971832d7d3837d1b275f258c4b2257 (diff)
downloadmailman-148e76e9564ac7e627b96f68a7c0ff1f6c8005a0.tar.gz
mailman-148e76e9564ac7e627b96f68a7c0ff1f6c8005a0.tar.zst
mailman-148e76e9564ac7e627b96f68a7c0ff1f6c8005a0.zip
First step in the conversion to a multiple-queue model. This script
gets simplified considerably because the bulk of the logic is moved into classes in the Mailman.Queue package. The next step might be to make the queues long running processes watchdogged by the qrunner. QRUNNERS is a list of classes to instantiate to run through the various queue directories. Currently there are only incoming, outgoing, and usenet queues. Each qrunner is run in it's own child process (via fork), so we need to be more careful about race conditions in the re-queuing logic. main(): Pretty simple now, it just forks and waits on the child processes.
Diffstat (limited to 'cron')
-rw-r--r--cron/qrunner268
1 files changed, 22 insertions, 246 deletions
diff --git a/cron/qrunner b/cron/qrunner
index ec89c10c2..c42e10b71 100644
--- a/cron/qrunner
+++ b/cron/qrunner
@@ -16,271 +16,47 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-"""Deliver queued messages.
+"""Process queued messages.
"""
-# A typical Mailman list exposes four aliases which point to three different
-# wrapped scripts. E.g. for a list named `mylist', you'd have:
-#
-# mylist -> post
-# mylist-admin -> mailowner
-# mylist-request -> mailcmd
-# mylist-owner -> mailowner (through an alias to mylist-admin)
-#
-# Only 3 scripts are used for historical purposes, and this is unlikely to
-# change to due backwards compatibility. That's immaterial though since the
-# mailowner script can determine which alias it received the message on.
-#
-# mylist-request is a robot address; it's sole purpose is to process emailed
-# commands in a Majordomo-like fashion. mylist-admin is supposed to reach the
-# list owners, but it performs one vital step before list owner delivery - it
-# examines the message for bounce content. mylist-owner is the fallback for
-# delivery to the list owners; it performs no bounce detection, but it /does/
-# look for bounce loops, which can occur if a list owner address is bouncing.
-#
-# So delivery flow of messages look like this:
-#
-# joerandom ---> mylist ---> list members
-# | |
-# | |[bounces]
-# +-------> mylist-admin <----+ <-------------------------------+
-# | | |
-# | +--->[internal bounce processing] |
-# | | |
-# | | [bounce found] |
-# | +--->[register and discard] |
-# | | |
-# | | [no bounce found] |
-# | +---> list owners <------+ |
-# | | | |
-# | |[bounces] | |
-# +-------> mylist-owner <-------------------+ | |
-# | | | |
-# | | [bounce loop detected] | |
-# | +---> [log and discard] | |
-# | | | |
-# | +-----------------------------------------+ |
-# | [no bounce loop detected] |
-# | |
-# | |
-# +-------> mylist-request |
-# | |
-# +---> [command processor] |
-# | |
-# +---> joerandom |
-# | |
-# |[bounces] |
-# +----------------------+
-
-import sys
import os
-import string
-import time
-import marshal
-import mimetools
import paths
from Mailman import mm_cfg
from Mailman import Utils
-from Mailman import MailList
-from Mailman import LockFile
-from Mailman import Message
-from Mailman import Errors
-from Mailman.Handlers import HandlerAPI
-from Mailman.Bouncers import BouncerAPI
-from Mailman.Logging.Syslog import syslog
+from Mailman.Queue import IncomingRunner
+from Mailman.Queue import OutgoingRunner
+from Mailman.Queue import NewsRunner
from Mailman.Logging.Utils import LogStdErr
-from Mailman.pythonlib.StringIO import StringIO
# Work around known problems with some RedHat cron daemons
import signal
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
-QRUNNER_LOCK_FILE = os.path.join(mm_cfg.LOCK_DIR, 'qrunner.lock')
-
LogStdErr('error', 'qrunner', manual_reprime=0, tee_to_stdout=0)
-
-
-def dispose_message(mlist, msg, msgdata):
- # The message may be destined for one of three subsystems: the list
- # delivery subsystem (i.e. the message gets delivered to every member of
- # the list), the bounce detector (i.e. this was a message to the -owner
- # address), or the mail command handler (i.e. this was a message to the
- # -request address). The flags controlling this path are found in the
- # message data, as queued by the post, mailowner, and mailcmd scripts
- # respectively:
- #
- # post - no `toadmin' or `torequest' key
- # mailowner - `toadmin' == true
- # mailcmd - `torequest' == true
- if msgdata.get('toadmin', 0):
- s = StringIO(str(msg))
- mimemsg = mimetools.Message(s)
- if mlist.bounce_processing:
- if BouncerAPI.ScanMessages(mlist, mimemsg):
- return 0
- # Either bounce processing isn't turned on or the bounce detector
- # found no recognized bounce format in the message. In either case,
- # forward the dang thing off to the list owners. Be sure to munge the
- # headers so that any bounces from the list owners goes to the -owner
- # address instead of the -admin address. This will avoid bounce
- # loops.
- msgdata.update({'recips' : mlist.owner[:],
- 'errorsto': mlist.GetOwnerEmail(),
- 'noack' : 0, # enable Replybot
- })
- return HandlerAPI.DeliverToUser(mlist, msg, msgdata)
- elif msgdata.get('toowner', 0):
- # The message could have been a bounce from a broken list admin
- # address. About the only other test we can do is to see if the
- # message is appearing to come from a well-known MTA generated
- # address.
- sender = msg.GetSender()
- i = string.find(sender, '@')
- if i >= 0:
- senderlhs = string.lower(sender[:i])
- else:
- senderlhs = sender
- if senderlhs in mm_cfg.LIKELY_BOUNCE_SENDERS:
- syslog('error', 'bounce loop detected from: %s' % sender)
- return 0
- # Any messages to the owner address must have Errors-To: set back to
- # the owners address so bounce loops can be broken, as per the code
- # above.
- msgdata.update({'recips' : mlist.owner[:],
- 'errorsto': mlist.GetOwnerEmail(),
- 'noack' : 0, # enable Replybot
- })
- return HandlerAPI.DeliverToUser(mlist, msg, msgdata)
- elif msgdata.get('torequest', 0):
- mlist.ParseMailCommands(msg)
- return 0
- else:
- # Pre 2.0beta3 qfiles have no schema version number
- version = msgdata.get('version', 0)
- if version < 1:
- return HandlerAPI.RedeliverMessage(mlist, msg)
- return HandlerAPI.DeliverToList(mlist, msg, msgdata)
-
-
-
-_listcache = {}
-def open_list(listname):
- global _listcache
- mlist = _listcache.get(listname)
- if not mlist:
- try:
- mlist = MailList.MailList(listname, lock=0)
- _listcache[listname] = mlist
- except Errors.MMListError, e:
- syslog('qrunner', 'error opening list: %s\n%s' % (listname, e))
- return mlist
+QRUNNERS = [IncomingRunner.IncomingRunner,
+ OutgoingRunner.OutgoingRunner,
+ NewsRunner.NewsRunner,
+ ]
-def dequeue(root):
- # We're done with this message
- os.unlink(root + '.db')
- os.unlink(root + '.msg')
-
-
-
-def main(lock):
- t0 = time.time()
- msgcount = 0
- allkids = {}
- for file in os.listdir(mm_cfg.QUEUE_DIR):
- # Have we exceeded either resource limit?
- if mm_cfg.QRUNNER_PROCESS_LIFETIME is not None and \
- (time.time() - t0) > mm_cfg.QRUNNER_PROCESS_LIFETIME:
- break
- if mm_cfg.QRUNNER_MAX_MESSAGES is not None and \
- msgcount > mm_cfg.QRUNNER_MAX_MESSAGES:
- break
- msgcount = msgcount + 1
- # Keep the qrunner lock alive for a while longer
- lock.refresh()
- root, ext = os.path.splitext(os.path.join(mm_cfg.QUEUE_DIR, file))
- if ext <> '.msg':
- # trigger just off the .msg file
- continue
- msgfp = dbfp = None
- try:
- dbfp = open(root + '.db')
- msgdata = marshal.load(dbfp)
- dbfp.close()
- dbfp = None
- msgfp = open(root + '.msg')
- # re-establish the file base for re-queuing
- msg = Message.Message(msgfp, filebase=msgdata.get('filebase'))
- msgfp.close()
- msgfp = None
- except (EOFError, ValueError, TypeError, IOError), e:
- # For some reason we had trouble getting all the information out
- # of the queued files. log this and move on (we figure it's a
- # temporary problem)
- syslog('qrunner', 'Exception reading qfile: %s\n%s' % (root, e))
- if msgfp:
- msgfp.close()
- if dbfp:
- dbfp.close()
- continue
- # Dispose of it, after ensuring that we've got the lock on the list.
- listname = msgdata.get('listname')
- if not listname:
- syslog('qrunner', 'qfile metadata specifies no list: %s' % root)
- continue
- mlist = open_list(listname)
- if not mlist:
- syslog('qrunner',
- 'Dequeuing message destined for missing list: %s' % root)
- dequeue(root)
- continue
- # Now try to get the list lock
- try:
- mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
- except LockFile.TimeOutError:
- # oh well, try again later
- continue
- try:
- keepqueued = dispose_message(mlist, msg, msgdata)
- # Did the delivery generate child processes? Don't store them in
- # the message data files.
- kids = msgdata.get('_kids')
- if kids:
- allkids.update(kids)
- del msgdata['_kids']
- if not keepqueued:
- # We're done with this message
- dequeue(root)
- finally:
- mlist.Save()
- mlist.Unlock()
- return allkids
+def main():
+ # run each qrunner in their own processes
+ kids = {}
+ for qrclass in QRUNNERS:
+ pid = os.fork()
+ if pid:
+ # parent
+ kids[pid] = pid
+ else:
+ # child
+ qrclass().run()
+ os._exit(0)
+ Utils.reap(kids)
if __name__ == '__main__':
- global _listcache
-## syslog('qrunner', 'qrunner begining')
- # first, claim the queue runner lock
- lock = LockFile.LockFile(QRUNNER_LOCK_FILE,
- lifetime=mm_cfg.QRUNNER_LOCK_LIFETIME)
- try:
- lock.lock(timeout=0.5)
- except LockFile.TimeOutError:
- # Some other qrunner process is running, which is fine.
- syslog('qrunner', 'Could not acquire qrunner lock')
- else:
- kids = {}
- try:
- kids = main(lock)
- finally:
- lock.unlock(unconditionally=1)
- # Clear the global cache to be clean about it. Also, we can reap
- # any child processes that were created during the delivery
- # (e.g. from ToUsenet).
- _listcache.clear()
- Utils.reap(kids)
-## syslog('qrunner', 'qrunner ended')
+ main()