diff options
| author | Barry Warsaw | 2011-05-29 12:45:19 -0400 |
|---|---|---|
| committer | Barry Warsaw | 2011-05-29 12:45:19 -0400 |
| commit | 521a179d309fac857fdbbe162d5db136c3ec3b1e (patch) | |
| tree | ec6e635e9c0f8a5bd655a254f9c346f1acb6dd8e | |
| parent | 0f760798fb2490a03041c42018afbd59749e6cbd (diff) | |
| download | mailman-521a179d309fac857fdbbe162d5db136c3ec3b1e.tar.gz mailman-521a179d309fac857fdbbe162d5db136c3ec3b1e.tar.zst mailman-521a179d309fac857fdbbe162d5db136c3ec3b1e.zip | |
| -rw-r--r-- | src/mailman/archiving/docs/common.txt | 2 | ||||
| -rw-r--r-- | src/mailman/bin/gate_news.py | 2 | ||||
| -rw-r--r-- | src/mailman/bin/qrunner.py | 2 | ||||
| -rw-r--r-- | src/mailman/commands/docs/echo.txt | 2 | ||||
| -rw-r--r-- | src/mailman/commands/docs/membership.txt | 2 | ||||
| -rw-r--r-- | src/mailman/config/config.py | 4 | ||||
| -rw-r--r-- | src/mailman/config/mailman.cfg | 30 | ||||
| -rw-r--r-- | src/mailman/config/schema.cfg | 2 | ||||
| -rw-r--r-- | src/mailman/core/docs/__init__.py (renamed from src/mailman/queue/tests/__init__.py) | 0 | ||||
| -rw-r--r-- | src/mailman/core/docs/runner.txt (renamed from src/mailman/queue/docs/runner.txt) | 2 | ||||
| -rw-r--r-- | src/mailman/core/docs/switchboard.txt (renamed from src/mailman/queue/docs/switchboard.txt) | 2 | ||||
| -rw-r--r-- | src/mailman/core/runner.py | 249 | ||||
| -rw-r--r-- | src/mailman/core/switchboard.py (renamed from src/mailman/queue/__init__.py) | 214 | ||||
| -rw-r--r-- | src/mailman/pipeline/mime_delete.py | 2 | ||||
| -rw-r--r-- | src/mailman/runners/__init__.py | 0 | ||||
| -rw-r--r-- | src/mailman/runners/archive.py (renamed from src/mailman/queue/archive.py) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/bounce.py (renamed from src/mailman/queue/bounce.py) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/command.py (renamed from src/mailman/queue/command.py) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/digest.py (renamed from src/mailman/queue/digest.py) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/docs/OVERVIEW.txt (renamed from src/mailman/queue/docs/OVERVIEW.txt) | 0 | ||||
| -rw-r--r-- | src/mailman/runners/docs/archiver.txt (renamed from src/mailman/queue/docs/archiver.txt) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/docs/command.txt (renamed from src/mailman/queue/docs/command.txt) | 4 | ||||
| -rw-r--r-- | src/mailman/runners/docs/digester.txt (renamed from src/mailman/queue/docs/digester.txt) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/docs/incoming.txt (renamed from src/mailman/queue/docs/incoming.txt) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/docs/lmtp.txt (renamed from src/mailman/queue/docs/lmtp.txt) | 0 | ||||
| -rw-r--r-- | src/mailman/runners/docs/news.txt (renamed from src/mailman/queue/docs/news.txt) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/docs/outgoing.txt (renamed from src/mailman/queue/docs/outgoing.txt) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/docs/rest.txt (renamed from src/mailman/queue/docs/rest.txt) | 0 | ||||
| -rw-r--r-- | src/mailman/runners/incoming.py (renamed from src/mailman/queue/incoming.py) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/lmtp.py (renamed from src/mailman/queue/lmtp.py) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/maildir.py (renamed from src/mailman/queue/maildir.py) | 3 | ||||
| -rw-r--r-- | src/mailman/runners/news.py (renamed from src/mailman/queue/news.py) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/outgoing.py (renamed from src/mailman/queue/outgoing.py) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/pipeline.py (renamed from src/mailman/queue/pipeline.py) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/rest.py (renamed from src/mailman/queue/rest.py) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/retry.py (renamed from src/mailman/queue/retry.py) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/tests/__init__.py | 0 | ||||
| -rw-r--r-- | src/mailman/runners/tests/test_bounce.py (renamed from src/mailman/queue/tests/test_bounce.py) | 2 | ||||
| -rw-r--r-- | src/mailman/runners/tests/test_outgoing.py (renamed from src/mailman/queue/tests/test_outgoing.py) | 10 | ||||
| -rw-r--r-- | src/mailman/runners/virgin.py (renamed from src/mailman/queue/virgin.py) | 2 |
40 files changed, 302 insertions, 266 deletions
diff --git a/src/mailman/archiving/docs/common.txt b/src/mailman/archiving/docs/common.txt index 1629458b3..330c8e307 100644 --- a/src/mailman/archiving/docs/common.txt +++ b/src/mailman/archiving/docs/common.txt @@ -88,7 +88,7 @@ address at The Mail Archive. The message gets no header or footer decoration. >>> archiver.archive_message(mlist, msg) - >>> from mailman.queue.outgoing import OutgoingRunner + >>> from mailman.runners.outgoing import OutgoingRunner >>> from mailman.testing.helpers import make_testable_runner >>> outgoing = make_testable_runner(OutgoingRunner, 'out') >>> outgoing.run() diff --git a/src/mailman/bin/gate_news.py b/src/mailman/bin/gate_news.py index 4ad29affc..3d637bd82 100644 --- a/src/mailman/bin/gate_news.py +++ b/src/mailman/bin/gate_news.py @@ -33,7 +33,7 @@ from mailman import Message from mailman import loginit from mailman.configuration import config from mailman.core.i18n import _ -from mailman.queue import Switchboard +from mailman.core.switchboard import Switchboard from mailman.version import MAILMAN_VERSION # Work around known problems with some RedHat cron daemons diff --git a/src/mailman/bin/qrunner.py b/src/mailman/bin/qrunner.py index f98fd98c6..2e20cee61 100644 --- a/src/mailman/bin/qrunner.py +++ b/src/mailman/bin/qrunner.py @@ -143,7 +143,7 @@ def make_qrunner(name, slice, range, once=False): # It was a shortcut name. class_path = qrunner_config['class'] elif name.startswith('.'): - class_path = 'mailman.queue' + name + class_path = 'mailman.runners' + name else: class_path = name try: diff --git a/src/mailman/commands/docs/echo.txt b/src/mailman/commands/docs/echo.txt index a01172d04..ced483ea8 100644 --- a/src/mailman/commands/docs/echo.txt +++ b/src/mailman/commands/docs/echo.txt @@ -17,7 +17,7 @@ The original message is ignored, but the results receive the echoed command. >>> mlist = create_list('test@example.com') - >>> from mailman.queue.command import Results + >>> from mailman.runners.command import Results >>> results = Results() >>> from mailman.email.message import Message diff --git a/src/mailman/commands/docs/membership.txt b/src/mailman/commands/docs/membership.txt index 851f01514..d05f12eee 100644 --- a/src/mailman/commands/docs/membership.txt +++ b/src/mailman/commands/docs/membership.txt @@ -41,7 +41,7 @@ When no address argument is given, the message's From address will be used. If that's missing though, then an error is returned. :: - >>> from mailman.queue.command import Results + >>> from mailman.runners.command import Results >>> results = Results() >>> from mailman.email.message import Message diff --git a/src/mailman/config/config.py b/src/mailman/config/config.py index 9c210b6a2..4b2e9749d 100644 --- a/src/mailman/config/config.py +++ b/src/mailman/config/config.py @@ -120,8 +120,8 @@ class Configuration: """Perform post-processing after loading the configuration files.""" # Expand and set up all directories. self._expand_paths() - # Set up the switchboards. - from mailman.queue import Switchboard + # Set up the switchboards. Import this here to avoid circular imports. + from mailman.core.switchboard import Switchboard Switchboard.initialize() # Set up all the languages. languages = self._config.getByCategory('language', []) diff --git a/src/mailman/config/mailman.cfg b/src/mailman/config/mailman.cfg index d7bc0fded..fba1d333e 100644 --- a/src/mailman/config/mailman.cfg +++ b/src/mailman/config/mailman.cfg @@ -42,55 +42,55 @@ pid_file: /var/run/mailman/master-qrunner.pid [language.en] [qrunner.archive] -class: mailman.queue.archive.ArchiveRunner +class: mailman.runners.archive.ArchiveRunner [qrunner.bad] -class: mailman.queue.fake.BadRunner +class: mailman.runners.fake.BadRunner # The bad runner is just a placeholder for its switchboard. start: no [qrunner.bounces] -class: mailman.queue.bounce.BounceRunner +class: mailman.runners.bounce.BounceRunner [qrunner.command] -class: mailman.queue.command.CommandRunner +class: mailman.runners.command.CommandRunner [qrunner.in] -class: mailman.queue.incoming.IncomingRunner +class: mailman.runners.incoming.IncomingRunner [qrunner.lmtp] -class: mailman.queue.lmtp.LMTPRunner +class: mailman.runners.lmtp.LMTPRunner [qrunner.maildir] -class: mailman.queue.maildir.MaildirRunner +class: mailman.runners.maildir.MaildirRunner # This is still experimental. start: no [qrunner.news] -class: mailman.queue.news.NewsRunner +class: mailman.runners.news.NewsRunner [qrunner.out] -class: mailman.queue.outgoing.OutgoingRunner +class: mailman.runners.outgoing.OutgoingRunner [qrunner.pipeline] -class: mailman.queue.pipeline.PipelineRunner +class: mailman.runners.pipeline.PipelineRunner [qrunner.rest] -class: mailman.queue.rest.RESTRunner +class: mailman.runners.rest.RESTRunner [qrunner.retry] -class: mailman.queue.retry.RetryRunner +class: mailman.runners.retry.RetryRunner sleep_time: 15m [qrunner.shunt] -class: mailman.queue.fake.ShuntRunner +class: mailman.runners.fake.ShuntRunner # The shunt runner is just a placeholder for its switchboard. start: no [qrunner.virgin] -class: mailman.queue.virgin.VirginRunner +class: mailman.runners.virgin.VirginRunner [qrunner.digest] -class: mailman.queue.digest.DigestRunner +class: mailman.runners.digest.DigestRunner [style.default] diff --git a/src/mailman/config/schema.cfg b/src/mailman/config/schema.cfg index 329aea412..2e5bfb2c5 100644 --- a/src/mailman/config/schema.cfg +++ b/src/mailman/config/schema.cfg @@ -166,7 +166,7 @@ user_friendly_passwords: yes # Define which process queue runners, and how many of them, to start. # The full import path to the class for this queue runner. -class: mailman.queue.runner.Runner +class: mailman.core.runner.Runner # The directory path that this queue runner scans. path: $QUEUE_DIR/$name diff --git a/src/mailman/queue/tests/__init__.py b/src/mailman/core/docs/__init__.py index e69de29bb..e69de29bb 100644 --- a/src/mailman/queue/tests/__init__.py +++ b/src/mailman/core/docs/__init__.py diff --git a/src/mailman/queue/docs/runner.txt b/src/mailman/core/docs/runner.txt index 39e8fede2..4262dc87a 100644 --- a/src/mailman/queue/docs/runner.txt +++ b/src/mailman/core/docs/runner.txt @@ -28,7 +28,7 @@ configuration section for the test runner. ... max_restarts: 1 ... """) - >>> from mailman.queue import Runner + >>> from mailman.core.runner import Runner >>> class TestableRunner(Runner): ... def _dispose(self, mlist, msg, msgdata): ... self.msg = msg diff --git a/src/mailman/queue/docs/switchboard.txt b/src/mailman/core/docs/switchboard.txt index d89aa3693..751b1e640 100644 --- a/src/mailman/queue/docs/switchboard.txt +++ b/src/mailman/core/docs/switchboard.txt @@ -15,7 +15,7 @@ Create a switchboard by giving its queue name and directory. >>> import os >>> queue_directory = os.path.join(config.QUEUE_DIR, 'test') - >>> from mailman.queue import Switchboard + >>> from mailman.core.switchboard import Switchboard >>> switchboard = Switchboard('test', queue_directory) >>> print switchboard.name test diff --git a/src/mailman/core/runner.py b/src/mailman/core/runner.py new file mode 100644 index 000000000..3d876ac3d --- /dev/null +++ b/src/mailman/core/runner.py @@ -0,0 +1,249 @@ +# Copyright (C) 2001-2011 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. + +"""The process runner base class.""" + +from __future__ import absolute_import, unicode_literals + +__metaclass__ = type +__all__ = [ + 'Runner', + ] + + +import time +import logging +import traceback + +from cStringIO import StringIO +from lazr.config import as_boolean, as_timedelta +from zope.component import getUtility +from zope.interface import implements + +from mailman.config import config +from mailman.core.i18n import _ +from mailman.core.switchboard import Switchboard +from mailman.interfaces.languages import ILanguageManager +from mailman.interfaces.listmanager import IListManager +from mailman.interfaces.runner import IRunner +from mailman.utilities.string import expand + + +dlog = logging.getLogger('mailman.debug') +elog = logging.getLogger('mailman.error') + + + +class Runner: + implements(IRunner) + + intercept_signals = True + + def __init__(self, name, slice=None): + """Create a queue runner. + + :param slice: The slice number for this queue runner. This is passed + directly to the underlying `ISwitchboard` object. + :type slice: int or None + """ + # Grab the configuration section. + self.name = name + section = getattr(config, 'qrunner.' + name) + substitutions = config.paths + substitutions['name'] = name + self.queue_directory = expand(section.path, substitutions) + numslices = int(section.instances) + self.switchboard = Switchboard( + name, self.queue_directory, slice, numslices, True) + self.sleep_time = as_timedelta(section.sleep_time) + # sleep_time is a timedelta; turn it into a float for time.sleep(). + self.sleep_float = (86400 * self.sleep_time.days + + self.sleep_time.seconds + + self.sleep_time.microseconds / 1.0e6) + self.max_restarts = int(section.max_restarts) + self.start = as_boolean(section.start) + self._stop = False + + def __repr__(self): + return '<{0} at {1:#x}>'.format(self.__class__.__name__, id(self)) + + def stop(self): + """See `IRunner`.""" + self._stop = True + + def run(self): + """See `IRunner`.""" + # Start the main loop for this queue runner. + try: + while True: + # Once through the loop that processes all the files in the + # queue directory. + filecnt = self._one_iteration() + # Do the periodic work for the subclass. + self._do_periodic() + # If the stop flag is set, we're done. + if self._stop: + break + # Give the runner an opportunity to snooze for a while, but + # pass it the file count so it can decide whether to do more + # work now or not. + self._snooze(filecnt) + except KeyboardInterrupt: + pass + finally: + self._clean_up() + + def _one_iteration(self): + """See `IRunner`.""" + me = self.__class__.__name__ + dlog.debug('[%s] starting oneloop', me) + # List all the files in our queue directory. The switchboard is + # guaranteed to hand us the files in FIFO order. + files = self.switchboard.files + for filebase in files: + dlog.debug('[%s] processing filebase: %s', me, filebase) + try: + # Ask the switchboard for the message and metadata objects + # associated with this queue file. + msg, msgdata = self.switchboard.dequeue(filebase) + except Exception as error: + # This used to just catch email.Errors.MessageParseError, but + # other problems can occur in message parsing, e.g. + # ValueError, and exceptions can occur in unpickling too. We + # don't want the runner to die, so we just log and skip this + # entry, but preserve it for analysis. + self._log(error) + elog.error('Skipping and preserving unparseable message: %s', + filebase) + self.switchboard.finish(filebase, preserve=True) + config.db.abort() + continue + try: + dlog.debug('[%s] processing onefile', me) + self._process_one_file(msg, msgdata) + dlog.debug('[%s] finishing filebase: %s', me, filebase) + self.switchboard.finish(filebase) + except Exception as error: + # All runners that implement _dispose() must guarantee that + # exceptions are caught and dealt with properly. Still, there + # may be a bug in the infrastructure, and we do not want those + # to cause messages to be lost. Any uncaught exceptions will + # cause the message to be stored in the shunt queue for human + # intervention. + self._log(error) + # Put a marker in the metadata for unshunting. + msgdata['whichq'] = self.switchboard.name + # It is possible that shunting can throw an exception, e.g. a + # permissions problem or a MemoryError due to a really large + # message. Try to be graceful. + try: + shunt = config.switchboards['shunt'] + new_filebase = shunt.enqueue(msg, msgdata) + elog.error('SHUNTING: %s', new_filebase) + self.switchboard.finish(filebase) + except Exception as error: + # The message wasn't successfully shunted. Log the + # exception and try to preserve the original queue entry + # for possible analysis. + self._log(error) + elog.error( + 'SHUNTING FAILED, preserving original entry: %s', + filebase) + self.switchboard.finish(filebase, preserve=True) + config.db.abort() + # Other work we want to do each time through the loop. + dlog.debug('[%s] doing periodic', me) + self._do_periodic() + dlog.debug('[%s] checking short circuit', me) + if self._short_circuit(): + dlog.debug('[%s] short circuiting', me) + break + dlog.debug('[%s] commiting', me) + config.db.commit() + dlog.debug('[%s] ending oneloop: %s', me, len(files)) + return len(files) + + def _process_one_file(self, msg, msgdata): + """See `IRunner`.""" + # Do some common sanity checking on the message metadata. It's got to + # be destined for a particular mailing list. This switchboard is used + # to shunt off badly formatted messages. We don't want to just trash + # them because they may be fixable with human intervention. Just get + # them out of our sight. + # + # Find out which mailing list this message is destined for. + missing = object() + listname = msgdata.get('listname', missing) + mlist = (None + if listname is missing + else getUtility(IListManager).get(unicode(listname))) + if mlist is None: + elog.error( + '%s runner "%s" shunting message for missing list: %s', + msg['message-id'], self.name, + ('n/a' if listname is missing else listname)) + config.switchboards['shunt'].enqueue(msg, msgdata) + return + # Now process this message. We also want to set up the language + # context for this message. The context will be the preferred + # language for the user if the sender is a member of the list, or it + # will be the list's preferred language. However, we must take + # special care to reset the defaults, otherwise subsequent messages + # may be translated incorrectly. + if mlist is None: + language_manager = getUtility(ILanguageManager) + language = language_manager[config.mailman.default_language] + elif msg.sender: + member = mlist.members.get_member(msg.sender) + language = (member.preferred_language + if member is not None + else mlist.preferred_language) + else: + language = mlist.preferred_language + with _.using(language.code): + msgdata['lang'] = language.code + keepqueued = self._dispose(mlist, msg, msgdata) + if keepqueued: + self.switchboard.enqueue(msg, msgdata) + + def _log(self, exc): + elog.error('Uncaught runner exception: %s', exc) + s = StringIO() + traceback.print_exc(file=s) + elog.error('%s', s.getvalue()) + + def _clean_up(self): + """See `IRunner`.""" + pass + + def _dispose(self, mlist, msg, msgdata): + """See `IRunner`.""" + raise NotImplementedError + + def _do_periodic(self): + """See `IRunner`.""" + pass + + def _snooze(self, filecnt): + """See `IRunner`.""" + if filecnt or self.sleep_float <= 0: + return + time.sleep(self.sleep_float) + + def _short_circuit(self): + """See `IRunner`.""" + return self._stop diff --git a/src/mailman/queue/__init__.py b/src/mailman/core/switchboard.py index 8abc5e9a6..5d9eb65ce 100644 --- a/src/mailman/queue/__init__.py +++ b/src/mailman/core/switchboard.py @@ -28,7 +28,6 @@ from __future__ import absolute_import, unicode_literals __metaclass__ = type __all__ = [ - 'Runner', 'Switchboard', ] @@ -40,19 +39,11 @@ import pickle import cPickle import hashlib import logging -import traceback -from cStringIO import StringIO -from lazr.config import as_boolean, as_timedelta -from zope.component import getUtility from zope.interface import implements from mailman.config import config -from mailman.core.i18n import _ from mailman.email.message import Message -from mailman.interfaces.languages import ILanguageManager -from mailman.interfaces.listmanager import IListManager -from mailman.interfaces.runner import IRunner from mailman.interfaces.switchboard import ISwitchboard from mailman.utilities.filesystem import makedirs from mailman.utilities.string import expand @@ -60,18 +51,15 @@ from mailman.utilities.string import expand # 20 bytes of all bits set, maximum hashlib.sha.digest() value shamax = 0xffffffffffffffffffffffffffffffffffffffffL - # Small increment to add to time in case two entries have the same time. This # prevents skipping one of two entries with the same time until the next pass. DELTA = .0001 -DOT = '.' # We count the number of times a file has been moved to .bak and recovered. # In order to prevent loops and a message flood, when the count reaches this # value, we move the file to the bad queue as a .psv. MAX_BAK_COUNT = 3 elog = logging.getLogger('mailman.error') -dlog = logging.getLogger('mailman.debug') @@ -279,205 +267,3 @@ class Switchboard: os.rename(src, dst) finally: fp.close() - - - -class Runner: - implements(IRunner) - - intercept_signals = True - - def __init__(self, name, slice=None): - """Create a queue runner. - - :param slice: The slice number for this queue runner. This is passed - directly to the underlying `ISwitchboard` object. - :type slice: int or None - """ - # Grab the configuration section. - self.name = name - section = getattr(config, 'qrunner.' + name) - substitutions = config.paths - substitutions['name'] = name - self.queue_directory = expand(section.path, substitutions) - numslices = int(section.instances) - self.switchboard = Switchboard( - name, self.queue_directory, slice, numslices, True) - self.sleep_time = as_timedelta(section.sleep_time) - # sleep_time is a timedelta; turn it into a float for time.sleep(). - self.sleep_float = (86400 * self.sleep_time.days + - self.sleep_time.seconds + - self.sleep_time.microseconds / 1.0e6) - self.max_restarts = int(section.max_restarts) - self.start = as_boolean(section.start) - self._stop = False - - def __repr__(self): - return '<{0} at {1:#x}>'.format(self.__class__.__name__, id(self)) - - def stop(self): - """See `IRunner`.""" - self._stop = True - - def run(self): - """See `IRunner`.""" - # Start the main loop for this queue runner. - try: - while True: - # Once through the loop that processes all the files in the - # queue directory. - filecnt = self._one_iteration() - # Do the periodic work for the subclass. - self._do_periodic() - # If the stop flag is set, we're done. - if self._stop: - break - # Give the runner an opportunity to snooze for a while, but - # pass it the file count so it can decide whether to do more - # work now or not. - self._snooze(filecnt) - except KeyboardInterrupt: - pass - finally: - self._clean_up() - - def _one_iteration(self): - """See `IRunner`.""" - me = self.__class__.__name__ - dlog.debug('[%s] starting oneloop', me) - # List all the files in our queue directory. The switchboard is - # guaranteed to hand us the files in FIFO order. - files = self.switchboard.files - for filebase in files: - dlog.debug('[%s] processing filebase: %s', me, filebase) - try: - # Ask the switchboard for the message and metadata objects - # associated with this queue file. - msg, msgdata = self.switchboard.dequeue(filebase) - except Exception as error: - # This used to just catch email.Errors.MessageParseError, but - # other problems can occur in message parsing, e.g. - # ValueError, and exceptions can occur in unpickling too. We - # don't want the runner to die, so we just log and skip this - # entry, but preserve it for analysis. - self._log(error) - elog.error('Skipping and preserving unparseable message: %s', - filebase) - self.switchboard.finish(filebase, preserve=True) - config.db.abort() - continue - try: - dlog.debug('[%s] processing onefile', me) - self._process_one_file(msg, msgdata) - dlog.debug('[%s] finishing filebase: %s', me, filebase) - self.switchboard.finish(filebase) - except Exception as error: - # All runners that implement _dispose() must guarantee that - # exceptions are caught and dealt with properly. Still, there - # may be a bug in the infrastructure, and we do not want those - # to cause messages to be lost. Any uncaught exceptions will - # cause the message to be stored in the shunt queue for human - # intervention. - self._log(error) - # Put a marker in the metadata for unshunting. - msgdata['whichq'] = self.switchboard.name - # It is possible that shunting can throw an exception, e.g. a - # permissions problem or a MemoryError due to a really large - # message. Try to be graceful. - try: - shunt = config.switchboards['shunt'] - new_filebase = shunt.enqueue(msg, msgdata) - elog.error('SHUNTING: %s', new_filebase) - self.switchboard.finish(filebase) - except Exception as error: - # The message wasn't successfully shunted. Log the - # exception and try to preserve the original queue entry - # for possible analysis. - self._log(error) - elog.error( - 'SHUNTING FAILED, preserving original entry: %s', - filebase) - self.switchboard.finish(filebase, preserve=True) - config.db.abort() - # Other work we want to do each time through the loop. - dlog.debug('[%s] doing periodic', me) - self._do_periodic() - dlog.debug('[%s] checking short circuit', me) - if self._short_circuit(): - dlog.debug('[%s] short circuiting', me) - break - dlog.debug('[%s] commiting', me) - config.db.commit() - dlog.debug('[%s] ending oneloop: %s', me, len(files)) - return len(files) - - def _process_one_file(self, msg, msgdata): - """See `IRunner`.""" - # Do some common sanity checking on the message metadata. It's got to - # be destined for a particular mailing list. This switchboard is used - # to shunt off badly formatted messages. We don't want to just trash - # them because they may be fixable with human intervention. Just get - # them out of our sight. - # - # Find out which mailing list this message is destined for. - missing = object() - listname = msgdata.get('listname', missing) - mlist = (None - if listname is missing - else getUtility(IListManager).get(unicode(listname))) - if mlist is None: - elog.error( - '%s runner "%s" shunting message for missing list: %s', - msg['message-id'], self.name, - ('n/a' if listname is missing else listname)) - config.switchboards['shunt'].enqueue(msg, msgdata) - return - # Now process this message. We also want to set up the language - # context for this message. The context will be the preferred - # language for the user if the sender is a member of the list, or it - # will be the list's preferred language. However, we must take - # special care to reset the defaults, otherwise subsequent messages - # may be translated incorrectly. - if mlist is None: - language_manager = getUtility(ILanguageManager) - language = language_manager[config.mailman.default_language] - elif msg.sender: - member = mlist.members.get_member(msg.sender) - language = (member.preferred_language - if member is not None - else mlist.preferred_language) - else: - language = mlist.preferred_language - with _.using(language.code): - msgdata['lang'] = language.code - keepqueued = self._dispose(mlist, msg, msgdata) - if keepqueued: - self.switchboard.enqueue(msg, msgdata) - - def _log(self, exc): - elog.error('Uncaught runner exception: %s', exc) - s = StringIO() - traceback.print_exc(file=s) - elog.error('%s', s.getvalue()) - - def _clean_up(self): - """See `IRunner`.""" - pass - - def _dispose(self, mlist, msg, msgdata): - """See `IRunner`.""" - raise NotImplementedError - - def _do_periodic(self): - """See `IRunner`.""" - pass - - def _snooze(self, filecnt): - """See `IRunner`.""" - if filecnt or self.sleep_float <= 0: - return - time.sleep(self.sleep_float) - - def _short_circuit(self): - """See `IRunner`.""" - return self._stop diff --git a/src/mailman/pipeline/mime_delete.py b/src/mailman/pipeline/mime_delete.py index 43197a878..381514795 100644 --- a/src/mailman/pipeline/mime_delete.py +++ b/src/mailman/pipeline/mime_delete.py @@ -44,8 +44,8 @@ from zope.interface import implements from mailman.config import config from mailman.core import errors from mailman.core.i18n import _ +from mailman.core.switchboard import Switchboard from mailman.interfaces.handler import IHandler -from mailman.queue import Switchboard from mailman.utilities.string import oneline from mailman.version import VERSION diff --git a/src/mailman/runners/__init__.py b/src/mailman/runners/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/mailman/runners/__init__.py diff --git a/src/mailman/queue/archive.py b/src/mailman/runners/archive.py index 99682f310..0a9e104d8 100644 --- a/src/mailman/queue/archive.py +++ b/src/mailman/runners/archive.py @@ -32,7 +32,7 @@ from flufl.lock import Lock from lazr.config import as_timedelta from mailman.config import config -from mailman.queue import Runner +from mailman.core.runner import Runner log = logging.getLogger('mailman.error') diff --git a/src/mailman/queue/bounce.py b/src/mailman/runners/bounce.py index a714f2669..8902dcbbc 100644 --- a/src/mailman/queue/bounce.py +++ b/src/mailman/runners/bounce.py @@ -23,8 +23,8 @@ from zope.component import getUtility from mailman.app.bounces import ( ProbeVERP, StandardVERP, maybe_forward, scan_message) +from mailman.core.runner import Runner from mailman.interfaces.bounce import BounceContext, IBounceProcessor, Stop -from mailman.queue import Runner COMMASPACE = ', ' diff --git a/src/mailman/queue/command.py b/src/mailman/runners/command.py index ec3ec0089..68aabc784 100644 --- a/src/mailman/queue/command.py +++ b/src/mailman/runners/command.py @@ -40,10 +40,10 @@ from zope.interface import implements from mailman.config import config from mailman.core.i18n import _ +from mailman.core.runner import Runner from mailman.email.message import UserNotification from mailman.interfaces.command import ContinueProcessing, IEmailResults from mailman.interfaces.languages import ILanguageManager -from mailman.queue import Runner NL = '\n' diff --git a/src/mailman/queue/digest.py b/src/mailman/runners/digest.py index 075335158..59ce1da3d 100644 --- a/src/mailman/queue/digest.py +++ b/src/mailman/runners/digest.py @@ -41,10 +41,10 @@ from email.utils import formatdate, getaddresses, make_msgid from mailman.config import config from mailman.core.errors import DiscardMessage from mailman.core.i18n import _ +from mailman.core.runner import Runner from mailman.interfaces.member import DeliveryMode, DeliveryStatus from mailman.pipeline.decorate import decorate from mailman.pipeline.scrubber import process as scrubber -from mailman.queue import Runner from mailman.utilities.i18n import make from mailman.utilities.mailbox import Mailbox from mailman.utilities.string import oneline, wrap diff --git a/src/mailman/queue/docs/OVERVIEW.txt b/src/mailman/runners/docs/OVERVIEW.txt index 41ccc18c9..41ccc18c9 100644 --- a/src/mailman/queue/docs/OVERVIEW.txt +++ b/src/mailman/runners/docs/OVERVIEW.txt diff --git a/src/mailman/queue/docs/archiver.txt b/src/mailman/runners/docs/archiver.txt index cdee449e1..a6f5ccd24 100644 --- a/src/mailman/queue/docs/archiver.txt +++ b/src/mailman/runners/docs/archiver.txt @@ -21,7 +21,7 @@ Mailman can archive to any number of archivers that adhere to the >>> archiver_queue = config.switchboards['archive'] >>> ignore = archiver_queue.enqueue(msg, {}, listname=mlist.fqdn_listname) - >>> from mailman.queue.archive import ArchiveRunner + >>> from mailman.runners.archive import ArchiveRunner >>> from mailman.testing.helpers import make_testable_runner >>> runner = make_testable_runner(ArchiveRunner) >>> runner.run() diff --git a/src/mailman/queue/docs/command.txt b/src/mailman/runners/docs/command.txt index c767e6f5f..4bf06b350 100644 --- a/src/mailman/queue/docs/command.txt +++ b/src/mailman/runners/docs/command.txt @@ -27,14 +27,14 @@ the sender. The command can be in the ``Subject`` header. >>> from mailman.inject import inject_message >>> inject_message(mlist, msg, switchboard='command') - >>> from mailman.queue.command import CommandRunner + >>> from mailman.runners.command import CommandRunner >>> from mailman.testing.helpers import make_testable_runner >>> command = make_testable_runner(CommandRunner) >>> command.run() And now the response is in the ``virgin`` queue. - >>> from mailman.queue import Switchboard + >>> from mailman.core.switchboard import Switchboard >>> virgin_queue = config.switchboards['virgin'] >>> len(virgin_queue.files) 1 diff --git a/src/mailman/queue/docs/digester.txt b/src/mailman/runners/docs/digester.txt index 285b2072a..c8f709a13 100644 --- a/src/mailman/queue/docs/digester.txt +++ b/src/mailman/runners/docs/digester.txt @@ -72,7 +72,7 @@ There are 4 messages in the digest. When the queue runner runs, it processes the digest mailbox, crafting both the plain text (RFC 1153) digest and the MIME digest. - >>> from mailman.queue.digest import DigestRunner + >>> from mailman.runners.digest import DigestRunner >>> from mailman.testing.helpers import make_testable_runner >>> runner = make_testable_runner(DigestRunner) >>> runner.run() diff --git a/src/mailman/queue/docs/incoming.txt b/src/mailman/runners/docs/incoming.txt index 6455db20b..22061044f 100644 --- a/src/mailman/queue/docs/incoming.txt +++ b/src/mailman/runners/docs/incoming.txt @@ -59,7 +59,7 @@ mail server normally would. The incoming queue runner runs until it is empty. - >>> from mailman.queue.incoming import IncomingRunner + >>> from mailman.runners.incoming import IncomingRunner >>> from mailman.testing.helpers import make_testable_runner >>> incoming = make_testable_runner(IncomingRunner, 'in') >>> incoming.run() diff --git a/src/mailman/queue/docs/lmtp.txt b/src/mailman/runners/docs/lmtp.txt index c95c6aa2b..c95c6aa2b 100644 --- a/src/mailman/queue/docs/lmtp.txt +++ b/src/mailman/runners/docs/lmtp.txt diff --git a/src/mailman/queue/docs/news.txt b/src/mailman/runners/docs/news.txt index 7261aa333..7b573d39e 100644 --- a/src/mailman/queue/docs/news.txt +++ b/src/mailman/runners/docs/news.txt @@ -34,7 +34,7 @@ prohibited headers. ... """) >>> msgdata = {} - >>> from mailman.queue.news import prepare_message + >>> from mailman.runners.news import prepare_message >>> prepare_message(mlist, msg, msgdata) >>> msgdata['prepped'] True diff --git a/src/mailman/queue/docs/outgoing.txt b/src/mailman/runners/docs/outgoing.txt index 0af22b808..a69fa36c5 100644 --- a/src/mailman/queue/docs/outgoing.txt +++ b/src/mailman/runners/docs/outgoing.txt @@ -62,7 +62,7 @@ destination mailing list name. Simulate that here too. Running the outgoing queue runner processes the message, delivering it to the upstream SMTP. - >>> from mailman.queue.outgoing import OutgoingRunner + >>> from mailman.runners.outgoing import OutgoingRunner >>> from mailman.testing.helpers import make_testable_runner >>> outgoing = make_testable_runner(OutgoingRunner, 'out') >>> outgoing.run() diff --git a/src/mailman/queue/docs/rest.txt b/src/mailman/runners/docs/rest.txt index 9e8851eca..9e8851eca 100644 --- a/src/mailman/queue/docs/rest.txt +++ b/src/mailman/runners/docs/rest.txt diff --git a/src/mailman/queue/incoming.py b/src/mailman/runners/incoming.py index f8d671177..c1f0d0e64 100644 --- a/src/mailman/queue/incoming.py +++ b/src/mailman/runners/incoming.py @@ -38,9 +38,9 @@ from zope.component import getUtility from mailman.config import config from mailman.core.chains import process +from mailman.core.runner import Runner from mailman.interfaces.address import ExistingAddressError from mailman.interfaces.usermanager import IUserManager -from mailman.queue import Runner diff --git a/src/mailman/queue/lmtp.py b/src/mailman/runners/lmtp.py index 9163a88e6..32d85b6ca 100644 --- a/src/mailman/queue/lmtp.py +++ b/src/mailman/runners/lmtp.py @@ -40,10 +40,10 @@ from email.utils import parseaddr from zope.component import getUtility from mailman.config import config +from mailman.core.runner import Runner from mailman.database.transaction import txn from mailman.email.message import Message from mailman.interfaces.listmanager import IListManager -from mailman.queue import Runner elog = logging.getLogger('mailman.error') qlog = logging.getLogger('mailman.qrunner') diff --git a/src/mailman/queue/maildir.py b/src/mailman/runners/maildir.py index 07a89903c..f0204f014 100644 --- a/src/mailman/queue/maildir.py +++ b/src/mailman/runners/maildir.py @@ -57,8 +57,9 @@ from email.parser import Parser from email.utils import parseaddr from mailman.config import config +from mailman.core.runner import Runner +from mailman.core.switchboard import Switchboard from mailman.message import Message -from mailman.queue import Runner, Switchboard log = logging.getLogger('mailman.error') diff --git a/src/mailman/queue/news.py b/src/mailman/runners/news.py index a3d915244..0c79cea52 100644 --- a/src/mailman/queue/news.py +++ b/src/mailman/runners/news.py @@ -27,8 +27,8 @@ from cStringIO import StringIO from lazr.config import as_host_port from mailman.config import config +from mailman.core.runner import Runner from mailman.interfaces.nntp import NewsModeration -from mailman.queue import Runner COMMASPACE = ', ' log = logging.getLogger('mailman.error') diff --git a/src/mailman/queue/outgoing.py b/src/mailman/runners/outgoing.py index ed27f014c..b1c71176b 100644 --- a/src/mailman/queue/outgoing.py +++ b/src/mailman/runners/outgoing.py @@ -25,12 +25,12 @@ from lazr.config import as_boolean, as_timedelta from zope.component import getUtility from mailman.config import config +from mailman.core.runner import Runner from mailman.interfaces.bounce import BounceContext, IBounceProcessor from mailman.interfaces.mailinglist import Personalization from mailman.interfaces.membership import ISubscriptionService from mailman.interfaces.mta import SomeRecipientsFailed from mailman.interfaces.pending import IPendings -from mailman.queue import Runner from mailman.utilities.datetime import now from mailman.utilities.modules import find_name diff --git a/src/mailman/queue/pipeline.py b/src/mailman/runners/pipeline.py index 099ebd032..6b9ad0a88 100644 --- a/src/mailman/queue/pipeline.py +++ b/src/mailman/runners/pipeline.py @@ -23,7 +23,7 @@ headers, calculates message recipients, and more. """ from mailman.core.pipelines import process -from mailman.queue import Runner +from mailman.core.runner import Runner diff --git a/src/mailman/queue/rest.py b/src/mailman/runners/rest.py index 31e840a51..1e631c6a3 100644 --- a/src/mailman/queue/rest.py +++ b/src/mailman/runners/rest.py @@ -31,7 +31,7 @@ import select import signal import logging -from mailman.queue import Runner +from mailman.core.runner import Runner from mailman.rest.wsgiapp import make_server diff --git a/src/mailman/queue/retry.py b/src/mailman/runners/retry.py index 24aa7f82b..8b4cfaada 100644 --- a/src/mailman/queue/retry.py +++ b/src/mailman/runners/retry.py @@ -28,7 +28,7 @@ __all__ = [ import time from mailman.config import config -from mailman.queue import Runner +from mailman.core.runner import Runner diff --git a/src/mailman/runners/tests/__init__.py b/src/mailman/runners/tests/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/mailman/runners/tests/__init__.py diff --git a/src/mailman/queue/tests/test_bounce.py b/src/mailman/runners/tests/test_bounce.py index 1946df50c..97123d1a4 100644 --- a/src/mailman/queue/tests/test_bounce.py +++ b/src/mailman/runners/tests/test_bounce.py @@ -36,7 +36,7 @@ from mailman.interfaces.bounce import ( BounceContext, IBounceProcessor, UnrecognizedBounceDisposition) from mailman.interfaces.member import MemberRole from mailman.interfaces.usermanager import IUserManager -from mailman.queue.bounce import BounceRunner +from mailman.runners.bounce import BounceRunner from mailman.testing.helpers import ( LogFileMark, get_queue_messages, diff --git a/src/mailman/queue/tests/test_outgoing.py b/src/mailman/runners/tests/test_outgoing.py index a0fe407c8..0036b4bd4 100644 --- a/src/mailman/queue/tests/test_outgoing.py +++ b/src/mailman/runners/tests/test_outgoing.py @@ -44,7 +44,7 @@ from mailman.interfaces.member import MemberRole from mailman.interfaces.mta import SomeRecipientsFailed from mailman.interfaces.pending import IPendings from mailman.interfaces.usermanager import IUserManager -from mailman.queue.outgoing import OutgoingRunner +from mailman.runners.outgoing import OutgoingRunner from mailman.testing.helpers import ( LogFileMark, get_queue_messages, @@ -129,7 +129,7 @@ class TestVERPSettings(unittest.TestCase): # setting of the 'verp' key in the metadata. config.push('fake outgoing', """ [mta] - outgoing: mailman.queue.tests.test_outgoing.capture + outgoing: mailman.runners.tests.test_outgoing.capture """) # Reset the captured data. captured_mlist = None @@ -265,7 +265,7 @@ class TestSocketError(unittest.TestCase): # setting of the 'verp' key in the metadata. config.push('fake outgoing', """ [mta] - outgoing: mailman.queue.tests.test_outgoing.raise_socket_error + outgoing: mailman.runners.tests.test_outgoing.raise_socket_error """) self._mlist = create_list('test@example.com') self._outq = config.switchboards['out'] @@ -346,8 +346,8 @@ class TestSomeRecipientsFailed(unittest.TestCase): # We generally don't care what this does, since we're just testing the # setting of the 'verp' key in the metadata. config.push('fake outgoing', """ - [mta] - outgoing: mailman.queue.tests.test_outgoing.raise_SomeRecipientsFailed + [mta] + outgoing: mailman.runners.tests.test_outgoing.raise_SomeRecipientsFailed """) self._mlist = create_list('test@example.com') self._outq = config.switchboards['out'] diff --git a/src/mailman/queue/virgin.py b/src/mailman/runners/virgin.py index 2dcdca910..de5788a01 100644 --- a/src/mailman/queue/virgin.py +++ b/src/mailman/runners/virgin.py @@ -24,7 +24,7 @@ recipient. """ from mailman.core.pipelines import process -from mailman.queue import Runner +from mailman.core.runner import Runner |
