summaryrefslogtreecommitdiff
path: root/Mailman/Queue/Runner.py
blob: 07f127b02a60b5fb0ba2edd5ab872f7beefd1dd5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# Copyright (C) 1998,1999,2000,2001 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.

"""Generic queue runner class.
"""

import random
import time
import traceback

from Mailman import mm_cfg
from Mailman import Utils
from Mailman import Errors
from Mailman import MailList
from Mailman import i18n

from Mailman.pythonlib.StringIO import StringIO
from Mailman.Queue.Switchboard import Switchboard
from Mailman.Logging.Syslog import syslog



class Runner:
    def __init__(self, qdir, slice=None, numslices=1, cachelists=1):
        self._qdir = qdir
        self._kids = {}
        self._cachelists = cachelists
        # Create our own switchboard.  Don't use the switchboard cache because
        # we want to provide slice and numslice arguments.
        self._switchboard = Switchboard(qdir, slice, numslices)
        # Create the shunt switchboard
        self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR)
        self._stop = 0

    def stop(self):
        self._stop = 1

    def run(self):
        # Start the main loop for this queue runner.
        try:
            try:
                while 1:
                    # Once through the loop that processes all the files in
                    # the queue directory.
                    filecnt = self.__oneloop()
                    # Do the periodic work for the subclass.
                    self._doperiodic()
                    # If the stop flag is set, we're done.
                    if self._stop:
                        break
                    # If there were no files to process, then we'll simply
                    # sleep for a little while and expect some to show up.
                    if filecnt == 0:
                        self._snooze()
            except KeyboardInterrupt:
                pass
        finally:
            # We've broken out of our main loop, so we want to reap all the
            # subprocesses we've created and do any other necessary cleanups.
            self._cleanup()

    def __oneloop(self):
        # First, list all the files in our queue directory.
        # Switchboard.files() is guaranteed to hand us the files in FIFO
        # order.
        files = self._switchboard.files()
        for filebase in files:
            # Ask the switchboard for the message and metadata objects
            # associated with this filebase.
            msg, msgdata = self._switchboard.dequeue(filebase)
            # It's possible one or both files got lost.  If so, just ignore
            # this filebase entry.  dequeue() will automatically unlink the
            # other file, but we should log an error message for diagnostics.
            if msg is None or msgdata is None:
                syslog('error', 'lost data files for filebase: %s' % filebase)
            else:
                # Now that we've dequeued the message, we want to be
                # incredibly anal about making sure that no uncaught exception
                # could cause us to lose the message.  All runners that
                # implement _dispose() must guarantee that exceptions are
                # caught and dealt with properly.  Still, there may be a bug
                # in the infrastructure, and we do not want those to cause
                # messages to be lost.  Any uncaught exceptions will cause the
                # message to be stored in the shunt queue for human
                # intervention.
                try:
                    self.__onefile(msg, msgdata)
                except Exception, e:
                    self._log(e)
                    self._shunt.enqueue(msg, msgdata)
            # Other work we want to do each time through the loop
            Utils.reap(self._kids, once=1)
            self._doperiodic()
        return len(files)

    def __onefile(self, msg, msgdata):
        # Do some common sanity checking on the message metadata.  It's got to
        # be destined for a particular mailing list.  This switchboard is used
        # to shunt off badly formatted messages.  We don't want to just trash
        # them because they may be fixable with human intervention.  Just get
        # them out of our site though.
        #
        # Find out which mailing list this message is destined for.
        listname = msgdata.get('listname')
        if not listname:
            syslog('qrunner', 'qfile metadata specifies no list: %s' %
                   filebase)
            self._shunt.enqueue(msg, metadata)
            return
        mlist = self._open_list(listname)
        if not mlist:
            syslog('qrunner',
                   'Dequeuing message destined for missing list: %s' %
                   filebase)
            self._shunt.enqueue(msg, metadata)
            return
        # Now process this message, keeping track of any subprocesses that may
        # have been spawned.  We'll reap those later.
        #
        # We also want to set up the language context for this message.  The
        # context will be the preferred language for the user if a member of
        # the list, or the list's preferred language.  However, we must take
        # special care to reset the defaults, otherwise subsequent messages
        # may be translated incorrectly.  BAW: I'm not sure I like this
        # approach, but I can't think of anything better right now.
        otranslation = i18n.get_translation()
        sender = msg.get_sender()
        lang = mlist.GetPreferredLanguage(sender)
        i18n.set_language(lang)
        msgdata['lang'] = lang
        try:
            keepqueued = self._dispose(mlist, msg, msgdata)
        finally:
            i18n.set_translation(otranslation)
        # Keep tabs on any child processes that got spawned.
        kids = msgdata.get('_kids')
        if kids:
            self._kids.update(kids)
        if keepqueued:
            self._switchboard.enqueue(msg, msgdata)
        
    # Mapping of listnames to MailList instances
    _listcache = {}

    def _open_list(self, listname, lockp=1):
        # Cache the opening of the list object given its name.  The probably
        # is only a moderate win because when a list is locked, all its
        # attributes are re-read from the config.db file.  This may help more
        # when there's a real backing database.
        if self._cachelists:
            mlist = self._listcache.get(listname)
        else:
            mlist = None
        if not mlist:
            try:
                mlist = MailList.MailList(listname, lock=0)
                if self._cachelists:
                    self._listcache[listname] = mlist
            except Errors.MMListError, e:
                syslog('qrunner', 'error opening list: %s\n%s' % (listname, e))
                return None
        return mlist

    def _log(self, exc):
        syslog('qrunner', 'Uncaught runner exception: %s' % exc)
        s = StringIO()
        traceback.print_exc(file=s)
        syslog('qrunner', s.getvalue())

    #
    # Subclasses can override _cleanup(), _dispose(), and _doperiodic()
    #
    def _cleanup(self):
        Utils.reap(self._kids)
        self._listcache.clear()

    def _dispose(self, mlist, msg, msgdata):
        raise UnimplementedError

    def _doperiodic(self):
        pass

    def _snooze(self):
        if mm_cfg.QRUNNER_SLEEP_TIME <= 0:
            return
        time.sleep(mm_cfg.QRUNNER_SLEEP_TIME)