summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBarry Warsaw2011-05-29 12:45:19 -0400
committerBarry Warsaw2011-05-29 12:45:19 -0400
commit521a179d309fac857fdbbe162d5db136c3ec3b1e (patch)
treeec6e635e9c0f8a5bd655a254f9c346f1acb6dd8e
parent0f760798fb2490a03041c42018afbd59749e6cbd (diff)
downloadmailman-521a179d309fac857fdbbe162d5db136c3ec3b1e.tar.gz
mailman-521a179d309fac857fdbbe162d5db136c3ec3b1e.tar.zst
mailman-521a179d309fac857fdbbe162d5db136c3ec3b1e.zip
-rw-r--r--src/mailman/archiving/docs/common.txt2
-rw-r--r--src/mailman/bin/gate_news.py2
-rw-r--r--src/mailman/bin/qrunner.py2
-rw-r--r--src/mailman/commands/docs/echo.txt2
-rw-r--r--src/mailman/commands/docs/membership.txt2
-rw-r--r--src/mailman/config/config.py4
-rw-r--r--src/mailman/config/mailman.cfg30
-rw-r--r--src/mailman/config/schema.cfg2
-rw-r--r--src/mailman/core/docs/__init__.py (renamed from src/mailman/queue/tests/__init__.py)0
-rw-r--r--src/mailman/core/docs/runner.txt (renamed from src/mailman/queue/docs/runner.txt)2
-rw-r--r--src/mailman/core/docs/switchboard.txt (renamed from src/mailman/queue/docs/switchboard.txt)2
-rw-r--r--src/mailman/core/runner.py249
-rw-r--r--src/mailman/core/switchboard.py (renamed from src/mailman/queue/__init__.py)214
-rw-r--r--src/mailman/pipeline/mime_delete.py2
-rw-r--r--src/mailman/runners/__init__.py0
-rw-r--r--src/mailman/runners/archive.py (renamed from src/mailman/queue/archive.py)2
-rw-r--r--src/mailman/runners/bounce.py (renamed from src/mailman/queue/bounce.py)2
-rw-r--r--src/mailman/runners/command.py (renamed from src/mailman/queue/command.py)2
-rw-r--r--src/mailman/runners/digest.py (renamed from src/mailman/queue/digest.py)2
-rw-r--r--src/mailman/runners/docs/OVERVIEW.txt (renamed from src/mailman/queue/docs/OVERVIEW.txt)0
-rw-r--r--src/mailman/runners/docs/archiver.txt (renamed from src/mailman/queue/docs/archiver.txt)2
-rw-r--r--src/mailman/runners/docs/command.txt (renamed from src/mailman/queue/docs/command.txt)4
-rw-r--r--src/mailman/runners/docs/digester.txt (renamed from src/mailman/queue/docs/digester.txt)2
-rw-r--r--src/mailman/runners/docs/incoming.txt (renamed from src/mailman/queue/docs/incoming.txt)2
-rw-r--r--src/mailman/runners/docs/lmtp.txt (renamed from src/mailman/queue/docs/lmtp.txt)0
-rw-r--r--src/mailman/runners/docs/news.txt (renamed from src/mailman/queue/docs/news.txt)2
-rw-r--r--src/mailman/runners/docs/outgoing.txt (renamed from src/mailman/queue/docs/outgoing.txt)2
-rw-r--r--src/mailman/runners/docs/rest.txt (renamed from src/mailman/queue/docs/rest.txt)0
-rw-r--r--src/mailman/runners/incoming.py (renamed from src/mailman/queue/incoming.py)2
-rw-r--r--src/mailman/runners/lmtp.py (renamed from src/mailman/queue/lmtp.py)2
-rw-r--r--src/mailman/runners/maildir.py (renamed from src/mailman/queue/maildir.py)3
-rw-r--r--src/mailman/runners/news.py (renamed from src/mailman/queue/news.py)2
-rw-r--r--src/mailman/runners/outgoing.py (renamed from src/mailman/queue/outgoing.py)2
-rw-r--r--src/mailman/runners/pipeline.py (renamed from src/mailman/queue/pipeline.py)2
-rw-r--r--src/mailman/runners/rest.py (renamed from src/mailman/queue/rest.py)2
-rw-r--r--src/mailman/runners/retry.py (renamed from src/mailman/queue/retry.py)2
-rw-r--r--src/mailman/runners/tests/__init__.py0
-rw-r--r--src/mailman/runners/tests/test_bounce.py (renamed from src/mailman/queue/tests/test_bounce.py)2
-rw-r--r--src/mailman/runners/tests/test_outgoing.py (renamed from src/mailman/queue/tests/test_outgoing.py)10
-rw-r--r--src/mailman/runners/virgin.py (renamed from src/mailman/queue/virgin.py)2
40 files changed, 302 insertions, 266 deletions
diff --git a/src/mailman/archiving/docs/common.txt b/src/mailman/archiving/docs/common.txt
index 1629458b3..330c8e307 100644
--- a/src/mailman/archiving/docs/common.txt
+++ b/src/mailman/archiving/docs/common.txt
@@ -88,7 +88,7 @@ address at The Mail Archive. The message gets no header or footer decoration.
>>> archiver.archive_message(mlist, msg)
- >>> from mailman.queue.outgoing import OutgoingRunner
+ >>> from mailman.runners.outgoing import OutgoingRunner
>>> from mailman.testing.helpers import make_testable_runner
>>> outgoing = make_testable_runner(OutgoingRunner, 'out')
>>> outgoing.run()
diff --git a/src/mailman/bin/gate_news.py b/src/mailman/bin/gate_news.py
index 4ad29affc..3d637bd82 100644
--- a/src/mailman/bin/gate_news.py
+++ b/src/mailman/bin/gate_news.py
@@ -33,7 +33,7 @@ from mailman import Message
from mailman import loginit
from mailman.configuration import config
from mailman.core.i18n import _
-from mailman.queue import Switchboard
+from mailman.core.switchboard import Switchboard
from mailman.version import MAILMAN_VERSION
# Work around known problems with some RedHat cron daemons
diff --git a/src/mailman/bin/qrunner.py b/src/mailman/bin/qrunner.py
index f98fd98c6..2e20cee61 100644
--- a/src/mailman/bin/qrunner.py
+++ b/src/mailman/bin/qrunner.py
@@ -143,7 +143,7 @@ def make_qrunner(name, slice, range, once=False):
# It was a shortcut name.
class_path = qrunner_config['class']
elif name.startswith('.'):
- class_path = 'mailman.queue' + name
+ class_path = 'mailman.runners' + name
else:
class_path = name
try:
diff --git a/src/mailman/commands/docs/echo.txt b/src/mailman/commands/docs/echo.txt
index a01172d04..ced483ea8 100644
--- a/src/mailman/commands/docs/echo.txt
+++ b/src/mailman/commands/docs/echo.txt
@@ -17,7 +17,7 @@ The original message is ignored, but the results receive the echoed command.
>>> mlist = create_list('test@example.com')
- >>> from mailman.queue.command import Results
+ >>> from mailman.runners.command import Results
>>> results = Results()
>>> from mailman.email.message import Message
diff --git a/src/mailman/commands/docs/membership.txt b/src/mailman/commands/docs/membership.txt
index 851f01514..d05f12eee 100644
--- a/src/mailman/commands/docs/membership.txt
+++ b/src/mailman/commands/docs/membership.txt
@@ -41,7 +41,7 @@ When no address argument is given, the message's From address will be used.
If that's missing though, then an error is returned.
::
- >>> from mailman.queue.command import Results
+ >>> from mailman.runners.command import Results
>>> results = Results()
>>> from mailman.email.message import Message
diff --git a/src/mailman/config/config.py b/src/mailman/config/config.py
index 9c210b6a2..4b2e9749d 100644
--- a/src/mailman/config/config.py
+++ b/src/mailman/config/config.py
@@ -120,8 +120,8 @@ class Configuration:
"""Perform post-processing after loading the configuration files."""
# Expand and set up all directories.
self._expand_paths()
- # Set up the switchboards.
- from mailman.queue import Switchboard
+ # Set up the switchboards. Import this here to avoid circular imports.
+ from mailman.core.switchboard import Switchboard
Switchboard.initialize()
# Set up all the languages.
languages = self._config.getByCategory('language', [])
diff --git a/src/mailman/config/mailman.cfg b/src/mailman/config/mailman.cfg
index d7bc0fded..fba1d333e 100644
--- a/src/mailman/config/mailman.cfg
+++ b/src/mailman/config/mailman.cfg
@@ -42,55 +42,55 @@ pid_file: /var/run/mailman/master-qrunner.pid
[language.en]
[qrunner.archive]
-class: mailman.queue.archive.ArchiveRunner
+class: mailman.runners.archive.ArchiveRunner
[qrunner.bad]
-class: mailman.queue.fake.BadRunner
+class: mailman.runners.fake.BadRunner
# The bad runner is just a placeholder for its switchboard.
start: no
[qrunner.bounces]
-class: mailman.queue.bounce.BounceRunner
+class: mailman.runners.bounce.BounceRunner
[qrunner.command]
-class: mailman.queue.command.CommandRunner
+class: mailman.runners.command.CommandRunner
[qrunner.in]
-class: mailman.queue.incoming.IncomingRunner
+class: mailman.runners.incoming.IncomingRunner
[qrunner.lmtp]
-class: mailman.queue.lmtp.LMTPRunner
+class: mailman.runners.lmtp.LMTPRunner
[qrunner.maildir]
-class: mailman.queue.maildir.MaildirRunner
+class: mailman.runners.maildir.MaildirRunner
# This is still experimental.
start: no
[qrunner.news]
-class: mailman.queue.news.NewsRunner
+class: mailman.runners.news.NewsRunner
[qrunner.out]
-class: mailman.queue.outgoing.OutgoingRunner
+class: mailman.runners.outgoing.OutgoingRunner
[qrunner.pipeline]
-class: mailman.queue.pipeline.PipelineRunner
+class: mailman.runners.pipeline.PipelineRunner
[qrunner.rest]
-class: mailman.queue.rest.RESTRunner
+class: mailman.runners.rest.RESTRunner
[qrunner.retry]
-class: mailman.queue.retry.RetryRunner
+class: mailman.runners.retry.RetryRunner
sleep_time: 15m
[qrunner.shunt]
-class: mailman.queue.fake.ShuntRunner
+class: mailman.runners.fake.ShuntRunner
# The shunt runner is just a placeholder for its switchboard.
start: no
[qrunner.virgin]
-class: mailman.queue.virgin.VirginRunner
+class: mailman.runners.virgin.VirginRunner
[qrunner.digest]
-class: mailman.queue.digest.DigestRunner
+class: mailman.runners.digest.DigestRunner
[style.default]
diff --git a/src/mailman/config/schema.cfg b/src/mailman/config/schema.cfg
index 329aea412..2e5bfb2c5 100644
--- a/src/mailman/config/schema.cfg
+++ b/src/mailman/config/schema.cfg
@@ -166,7 +166,7 @@ user_friendly_passwords: yes
# Define which process queue runners, and how many of them, to start.
# The full import path to the class for this queue runner.
-class: mailman.queue.runner.Runner
+class: mailman.core.runner.Runner
# The directory path that this queue runner scans.
path: $QUEUE_DIR/$name
diff --git a/src/mailman/queue/tests/__init__.py b/src/mailman/core/docs/__init__.py
index e69de29bb..e69de29bb 100644
--- a/src/mailman/queue/tests/__init__.py
+++ b/src/mailman/core/docs/__init__.py
diff --git a/src/mailman/queue/docs/runner.txt b/src/mailman/core/docs/runner.txt
index 39e8fede2..4262dc87a 100644
--- a/src/mailman/queue/docs/runner.txt
+++ b/src/mailman/core/docs/runner.txt
@@ -28,7 +28,7 @@ configuration section for the test runner.
... max_restarts: 1
... """)
- >>> from mailman.queue import Runner
+ >>> from mailman.core.runner import Runner
>>> class TestableRunner(Runner):
... def _dispose(self, mlist, msg, msgdata):
... self.msg = msg
diff --git a/src/mailman/queue/docs/switchboard.txt b/src/mailman/core/docs/switchboard.txt
index d89aa3693..751b1e640 100644
--- a/src/mailman/queue/docs/switchboard.txt
+++ b/src/mailman/core/docs/switchboard.txt
@@ -15,7 +15,7 @@ Create a switchboard by giving its queue name and directory.
>>> import os
>>> queue_directory = os.path.join(config.QUEUE_DIR, 'test')
- >>> from mailman.queue import Switchboard
+ >>> from mailman.core.switchboard import Switchboard
>>> switchboard = Switchboard('test', queue_directory)
>>> print switchboard.name
test
diff --git a/src/mailman/core/runner.py b/src/mailman/core/runner.py
new file mode 100644
index 000000000..3d876ac3d
--- /dev/null
+++ b/src/mailman/core/runner.py
@@ -0,0 +1,249 @@
+# Copyright (C) 2001-2011 by the Free Software Foundation, Inc.
+#
+# This file is part of GNU Mailman.
+#
+# GNU Mailman is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
+
+"""The process runner base class."""
+
+from __future__ import absolute_import, unicode_literals
+
+__metaclass__ = type
+__all__ = [
+ 'Runner',
+ ]
+
+
+import time
+import logging
+import traceback
+
+from cStringIO import StringIO
+from lazr.config import as_boolean, as_timedelta
+from zope.component import getUtility
+from zope.interface import implements
+
+from mailman.config import config
+from mailman.core.i18n import _
+from mailman.core.switchboard import Switchboard
+from mailman.interfaces.languages import ILanguageManager
+from mailman.interfaces.listmanager import IListManager
+from mailman.interfaces.runner import IRunner
+from mailman.utilities.string import expand
+
+
+dlog = logging.getLogger('mailman.debug')
+elog = logging.getLogger('mailman.error')
+
+
+
+class Runner:
+ implements(IRunner)
+
+ intercept_signals = True
+
+ def __init__(self, name, slice=None):
+ """Create a queue runner.
+
+ :param slice: The slice number for this queue runner. This is passed
+ directly to the underlying `ISwitchboard` object.
+ :type slice: int or None
+ """
+ # Grab the configuration section.
+ self.name = name
+ section = getattr(config, 'qrunner.' + name)
+ substitutions = config.paths
+ substitutions['name'] = name
+ self.queue_directory = expand(section.path, substitutions)
+ numslices = int(section.instances)
+ self.switchboard = Switchboard(
+ name, self.queue_directory, slice, numslices, True)
+ self.sleep_time = as_timedelta(section.sleep_time)
+ # sleep_time is a timedelta; turn it into a float for time.sleep().
+ self.sleep_float = (86400 * self.sleep_time.days +
+ self.sleep_time.seconds +
+ self.sleep_time.microseconds / 1.0e6)
+ self.max_restarts = int(section.max_restarts)
+ self.start = as_boolean(section.start)
+ self._stop = False
+
+ def __repr__(self):
+ return '<{0} at {1:#x}>'.format(self.__class__.__name__, id(self))
+
+ def stop(self):
+ """See `IRunner`."""
+ self._stop = True
+
+ def run(self):
+ """See `IRunner`."""
+ # Start the main loop for this queue runner.
+ try:
+ while True:
+ # Once through the loop that processes all the files in the
+ # queue directory.
+ filecnt = self._one_iteration()
+ # Do the periodic work for the subclass.
+ self._do_periodic()
+ # If the stop flag is set, we're done.
+ if self._stop:
+ break
+ # Give the runner an opportunity to snooze for a while, but
+ # pass it the file count so it can decide whether to do more
+ # work now or not.
+ self._snooze(filecnt)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ self._clean_up()
+
+ def _one_iteration(self):
+ """See `IRunner`."""
+ me = self.__class__.__name__
+ dlog.debug('[%s] starting oneloop', me)
+ # List all the files in our queue directory. The switchboard is
+ # guaranteed to hand us the files in FIFO order.
+ files = self.switchboard.files
+ for filebase in files:
+ dlog.debug('[%s] processing filebase: %s', me, filebase)
+ try:
+ # Ask the switchboard for the message and metadata objects
+ # associated with this queue file.
+ msg, msgdata = self.switchboard.dequeue(filebase)
+ except Exception as error:
+ # This used to just catch email.Errors.MessageParseError, but
+ # other problems can occur in message parsing, e.g.
+ # ValueError, and exceptions can occur in unpickling too. We
+ # don't want the runner to die, so we just log and skip this
+ # entry, but preserve it for analysis.
+ self._log(error)
+ elog.error('Skipping and preserving unparseable message: %s',
+ filebase)
+ self.switchboard.finish(filebase, preserve=True)
+ config.db.abort()
+ continue
+ try:
+ dlog.debug('[%s] processing onefile', me)
+ self._process_one_file(msg, msgdata)
+ dlog.debug('[%s] finishing filebase: %s', me, filebase)
+ self.switchboard.finish(filebase)
+ except Exception as error:
+ # All runners that implement _dispose() must guarantee that
+ # exceptions are caught and dealt with properly. Still, there
+ # may be a bug in the infrastructure, and we do not want those
+ # to cause messages to be lost. Any uncaught exceptions will
+ # cause the message to be stored in the shunt queue for human
+ # intervention.
+ self._log(error)
+ # Put a marker in the metadata for unshunting.
+ msgdata['whichq'] = self.switchboard.name
+ # It is possible that shunting can throw an exception, e.g. a
+ # permissions problem or a MemoryError due to a really large
+ # message. Try to be graceful.
+ try:
+ shunt = config.switchboards['shunt']
+ new_filebase = shunt.enqueue(msg, msgdata)
+ elog.error('SHUNTING: %s', new_filebase)
+ self.switchboard.finish(filebase)
+ except Exception as error:
+ # The message wasn't successfully shunted. Log the
+ # exception and try to preserve the original queue entry
+ # for possible analysis.
+ self._log(error)
+ elog.error(
+ 'SHUNTING FAILED, preserving original entry: %s',
+ filebase)
+ self.switchboard.finish(filebase, preserve=True)
+ config.db.abort()
+ # Other work we want to do each time through the loop.
+ dlog.debug('[%s] doing periodic', me)
+ self._do_periodic()
+ dlog.debug('[%s] checking short circuit', me)
+ if self._short_circuit():
+ dlog.debug('[%s] short circuiting', me)
+ break
+ dlog.debug('[%s] commiting', me)
+ config.db.commit()
+ dlog.debug('[%s] ending oneloop: %s', me, len(files))
+ return len(files)
+
+ def _process_one_file(self, msg, msgdata):
+ """See `IRunner`."""
+ # Do some common sanity checking on the message metadata. It's got to
+ # be destined for a particular mailing list. This switchboard is used
+ # to shunt off badly formatted messages. We don't want to just trash
+ # them because they may be fixable with human intervention. Just get
+ # them out of our sight.
+ #
+ # Find out which mailing list this message is destined for.
+ missing = object()
+ listname = msgdata.get('listname', missing)
+ mlist = (None
+ if listname is missing
+ else getUtility(IListManager).get(unicode(listname)))
+ if mlist is None:
+ elog.error(
+ '%s runner "%s" shunting message for missing list: %s',
+ msg['message-id'], self.name,
+ ('n/a' if listname is missing else listname))
+ config.switchboards['shunt'].enqueue(msg, msgdata)
+ return
+ # Now process this message. We also want to set up the language
+ # context for this message. The context will be the preferred
+ # language for the user if the sender is a member of the list, or it
+ # will be the list's preferred language. However, we must take
+ # special care to reset the defaults, otherwise subsequent messages
+ # may be translated incorrectly.
+ if mlist is None:
+ language_manager = getUtility(ILanguageManager)
+ language = language_manager[config.mailman.default_language]
+ elif msg.sender:
+ member = mlist.members.get_member(msg.sender)
+ language = (member.preferred_language
+ if member is not None
+ else mlist.preferred_language)
+ else:
+ language = mlist.preferred_language
+ with _.using(language.code):
+ msgdata['lang'] = language.code
+ keepqueued = self._dispose(mlist, msg, msgdata)
+ if keepqueued:
+ self.switchboard.enqueue(msg, msgdata)
+
+ def _log(self, exc):
+ elog.error('Uncaught runner exception: %s', exc)
+ s = StringIO()
+ traceback.print_exc(file=s)
+ elog.error('%s', s.getvalue())
+
+ def _clean_up(self):
+ """See `IRunner`."""
+ pass
+
+ def _dispose(self, mlist, msg, msgdata):
+ """See `IRunner`."""
+ raise NotImplementedError
+
+ def _do_periodic(self):
+ """See `IRunner`."""
+ pass
+
+ def _snooze(self, filecnt):
+ """See `IRunner`."""
+ if filecnt or self.sleep_float <= 0:
+ return
+ time.sleep(self.sleep_float)
+
+ def _short_circuit(self):
+ """See `IRunner`."""
+ return self._stop
diff --git a/src/mailman/queue/__init__.py b/src/mailman/core/switchboard.py
index 8abc5e9a6..5d9eb65ce 100644
--- a/src/mailman/queue/__init__.py
+++ b/src/mailman/core/switchboard.py
@@ -28,7 +28,6 @@ from __future__ import absolute_import, unicode_literals
__metaclass__ = type
__all__ = [
- 'Runner',
'Switchboard',
]
@@ -40,19 +39,11 @@ import pickle
import cPickle
import hashlib
import logging
-import traceback
-from cStringIO import StringIO
-from lazr.config import as_boolean, as_timedelta
-from zope.component import getUtility
from zope.interface import implements
from mailman.config import config
-from mailman.core.i18n import _
from mailman.email.message import Message
-from mailman.interfaces.languages import ILanguageManager
-from mailman.interfaces.listmanager import IListManager
-from mailman.interfaces.runner import IRunner
from mailman.interfaces.switchboard import ISwitchboard
from mailman.utilities.filesystem import makedirs
from mailman.utilities.string import expand
@@ -60,18 +51,15 @@ from mailman.utilities.string import expand
# 20 bytes of all bits set, maximum hashlib.sha.digest() value
shamax = 0xffffffffffffffffffffffffffffffffffffffffL
-
# Small increment to add to time in case two entries have the same time. This
# prevents skipping one of two entries with the same time until the next pass.
DELTA = .0001
-DOT = '.'
# We count the number of times a file has been moved to .bak and recovered.
# In order to prevent loops and a message flood, when the count reaches this
# value, we move the file to the bad queue as a .psv.
MAX_BAK_COUNT = 3
elog = logging.getLogger('mailman.error')
-dlog = logging.getLogger('mailman.debug')
@@ -279,205 +267,3 @@ class Switchboard:
os.rename(src, dst)
finally:
fp.close()
-
-
-
-class Runner:
- implements(IRunner)
-
- intercept_signals = True
-
- def __init__(self, name, slice=None):
- """Create a queue runner.
-
- :param slice: The slice number for this queue runner. This is passed
- directly to the underlying `ISwitchboard` object.
- :type slice: int or None
- """
- # Grab the configuration section.
- self.name = name
- section = getattr(config, 'qrunner.' + name)
- substitutions = config.paths
- substitutions['name'] = name
- self.queue_directory = expand(section.path, substitutions)
- numslices = int(section.instances)
- self.switchboard = Switchboard(
- name, self.queue_directory, slice, numslices, True)
- self.sleep_time = as_timedelta(section.sleep_time)
- # sleep_time is a timedelta; turn it into a float for time.sleep().
- self.sleep_float = (86400 * self.sleep_time.days +
- self.sleep_time.seconds +
- self.sleep_time.microseconds / 1.0e6)
- self.max_restarts = int(section.max_restarts)
- self.start = as_boolean(section.start)
- self._stop = False
-
- def __repr__(self):
- return '<{0} at {1:#x}>'.format(self.__class__.__name__, id(self))
-
- def stop(self):
- """See `IRunner`."""
- self._stop = True
-
- def run(self):
- """See `IRunner`."""
- # Start the main loop for this queue runner.
- try:
- while True:
- # Once through the loop that processes all the files in the
- # queue directory.
- filecnt = self._one_iteration()
- # Do the periodic work for the subclass.
- self._do_periodic()
- # If the stop flag is set, we're done.
- if self._stop:
- break
- # Give the runner an opportunity to snooze for a while, but
- # pass it the file count so it can decide whether to do more
- # work now or not.
- self._snooze(filecnt)
- except KeyboardInterrupt:
- pass
- finally:
- self._clean_up()
-
- def _one_iteration(self):
- """See `IRunner`."""
- me = self.__class__.__name__
- dlog.debug('[%s] starting oneloop', me)
- # List all the files in our queue directory. The switchboard is
- # guaranteed to hand us the files in FIFO order.
- files = self.switchboard.files
- for filebase in files:
- dlog.debug('[%s] processing filebase: %s', me, filebase)
- try:
- # Ask the switchboard for the message and metadata objects
- # associated with this queue file.
- msg, msgdata = self.switchboard.dequeue(filebase)
- except Exception as error:
- # This used to just catch email.Errors.MessageParseError, but
- # other problems can occur in message parsing, e.g.
- # ValueError, and exceptions can occur in unpickling too. We
- # don't want the runner to die, so we just log and skip this
- # entry, but preserve it for analysis.
- self._log(error)
- elog.error('Skipping and preserving unparseable message: %s',
- filebase)
- self.switchboard.finish(filebase, preserve=True)
- config.db.abort()
- continue
- try:
- dlog.debug('[%s] processing onefile', me)
- self._process_one_file(msg, msgdata)
- dlog.debug('[%s] finishing filebase: %s', me, filebase)
- self.switchboard.finish(filebase)
- except Exception as error:
- # All runners that implement _dispose() must guarantee that
- # exceptions are caught and dealt with properly. Still, there
- # may be a bug in the infrastructure, and we do not want those
- # to cause messages to be lost. Any uncaught exceptions will
- # cause the message to be stored in the shunt queue for human
- # intervention.
- self._log(error)
- # Put a marker in the metadata for unshunting.
- msgdata['whichq'] = self.switchboard.name
- # It is possible that shunting can throw an exception, e.g. a
- # permissions problem or a MemoryError due to a really large
- # message. Try to be graceful.
- try:
- shunt = config.switchboards['shunt']
- new_filebase = shunt.enqueue(msg, msgdata)
- elog.error('SHUNTING: %s', new_filebase)
- self.switchboard.finish(filebase)
- except Exception as error:
- # The message wasn't successfully shunted. Log the
- # exception and try to preserve the original queue entry
- # for possible analysis.
- self._log(error)
- elog.error(
- 'SHUNTING FAILED, preserving original entry: %s',
- filebase)
- self.switchboard.finish(filebase, preserve=True)
- config.db.abort()
- # Other work we want to do each time through the loop.
- dlog.debug('[%s] doing periodic', me)
- self._do_periodic()
- dlog.debug('[%s] checking short circuit', me)
- if self._short_circuit():
- dlog.debug('[%s] short circuiting', me)
- break
- dlog.debug('[%s] commiting', me)
- config.db.commit()
- dlog.debug('[%s] ending oneloop: %s', me, len(files))
- return len(files)
-
- def _process_one_file(self, msg, msgdata):
- """See `IRunner`."""
- # Do some common sanity checking on the message metadata. It's got to
- # be destined for a particular mailing list. This switchboard is used
- # to shunt off badly formatted messages. We don't want to just trash
- # them because they may be fixable with human intervention. Just get
- # them out of our sight.
- #
- # Find out which mailing list this message is destined for.
- missing = object()
- listname = msgdata.get('listname', missing)
- mlist = (None
- if listname is missing
- else getUtility(IListManager).get(unicode(listname)))
- if mlist is None:
- elog.error(
- '%s runner "%s" shunting message for missing list: %s',
- msg['message-id'], self.name,
- ('n/a' if listname is missing else listname))
- config.switchboards['shunt'].enqueue(msg, msgdata)
- return
- # Now process this message. We also want to set up the language
- # context for this message. The context will be the preferred
- # language for the user if the sender is a member of the list, or it
- # will be the list's preferred language. However, we must take
- # special care to reset the defaults, otherwise subsequent messages
- # may be translated incorrectly.
- if mlist is None:
- language_manager = getUtility(ILanguageManager)
- language = language_manager[config.mailman.default_language]
- elif msg.sender:
- member = mlist.members.get_member(msg.sender)
- language = (member.preferred_language
- if member is not None
- else mlist.preferred_language)
- else:
- language = mlist.preferred_language
- with _.using(language.code):
- msgdata['lang'] = language.code
- keepqueued = self._dispose(mlist, msg, msgdata)
- if keepqueued:
- self.switchboard.enqueue(msg, msgdata)
-
- def _log(self, exc):
- elog.error('Uncaught runner exception: %s', exc)
- s = StringIO()
- traceback.print_exc(file=s)
- elog.error('%s', s.getvalue())
-
- def _clean_up(self):
- """See `IRunner`."""
- pass
-
- def _dispose(self, mlist, msg, msgdata):
- """See `IRunner`."""
- raise NotImplementedError
-
- def _do_periodic(self):
- """See `IRunner`."""
- pass
-
- def _snooze(self, filecnt):
- """See `IRunner`."""
- if filecnt or self.sleep_float <= 0:
- return
- time.sleep(self.sleep_float)
-
- def _short_circuit(self):
- """See `IRunner`."""
- return self._stop
diff --git a/src/mailman/pipeline/mime_delete.py b/src/mailman/pipeline/mime_delete.py
index 43197a878..381514795 100644
--- a/src/mailman/pipeline/mime_delete.py
+++ b/src/mailman/pipeline/mime_delete.py
@@ -44,8 +44,8 @@ from zope.interface import implements
from mailman.config import config
from mailman.core import errors
from mailman.core.i18n import _
+from mailman.core.switchboard import Switchboard
from mailman.interfaces.handler import IHandler
-from mailman.queue import Switchboard
from mailman.utilities.string import oneline
from mailman.version import VERSION
diff --git a/src/mailman/runners/__init__.py b/src/mailman/runners/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/mailman/runners/__init__.py
diff --git a/src/mailman/queue/archive.py b/src/mailman/runners/archive.py
index 99682f310..0a9e104d8 100644
--- a/src/mailman/queue/archive.py
+++ b/src/mailman/runners/archive.py
@@ -32,7 +32,7 @@ from flufl.lock import Lock
from lazr.config import as_timedelta
from mailman.config import config
-from mailman.queue import Runner
+from mailman.core.runner import Runner
log = logging.getLogger('mailman.error')
diff --git a/src/mailman/queue/bounce.py b/src/mailman/runners/bounce.py
index a714f2669..8902dcbbc 100644
--- a/src/mailman/queue/bounce.py
+++ b/src/mailman/runners/bounce.py
@@ -23,8 +23,8 @@ from zope.component import getUtility
from mailman.app.bounces import (
ProbeVERP, StandardVERP, maybe_forward, scan_message)
+from mailman.core.runner import Runner
from mailman.interfaces.bounce import BounceContext, IBounceProcessor, Stop
-from mailman.queue import Runner
COMMASPACE = ', '
diff --git a/src/mailman/queue/command.py b/src/mailman/runners/command.py
index ec3ec0089..68aabc784 100644
--- a/src/mailman/queue/command.py
+++ b/src/mailman/runners/command.py
@@ -40,10 +40,10 @@ from zope.interface import implements
from mailman.config import config
from mailman.core.i18n import _
+from mailman.core.runner import Runner
from mailman.email.message import UserNotification
from mailman.interfaces.command import ContinueProcessing, IEmailResults
from mailman.interfaces.languages import ILanguageManager
-from mailman.queue import Runner
NL = '\n'
diff --git a/src/mailman/queue/digest.py b/src/mailman/runners/digest.py
index 075335158..59ce1da3d 100644
--- a/src/mailman/queue/digest.py
+++ b/src/mailman/runners/digest.py
@@ -41,10 +41,10 @@ from email.utils import formatdate, getaddresses, make_msgid
from mailman.config import config
from mailman.core.errors import DiscardMessage
from mailman.core.i18n import _
+from mailman.core.runner import Runner
from mailman.interfaces.member import DeliveryMode, DeliveryStatus
from mailman.pipeline.decorate import decorate
from mailman.pipeline.scrubber import process as scrubber
-from mailman.queue import Runner
from mailman.utilities.i18n import make
from mailman.utilities.mailbox import Mailbox
from mailman.utilities.string import oneline, wrap
diff --git a/src/mailman/queue/docs/OVERVIEW.txt b/src/mailman/runners/docs/OVERVIEW.txt
index 41ccc18c9..41ccc18c9 100644
--- a/src/mailman/queue/docs/OVERVIEW.txt
+++ b/src/mailman/runners/docs/OVERVIEW.txt
diff --git a/src/mailman/queue/docs/archiver.txt b/src/mailman/runners/docs/archiver.txt
index cdee449e1..a6f5ccd24 100644
--- a/src/mailman/queue/docs/archiver.txt
+++ b/src/mailman/runners/docs/archiver.txt
@@ -21,7 +21,7 @@ Mailman can archive to any number of archivers that adhere to the
>>> archiver_queue = config.switchboards['archive']
>>> ignore = archiver_queue.enqueue(msg, {}, listname=mlist.fqdn_listname)
- >>> from mailman.queue.archive import ArchiveRunner
+ >>> from mailman.runners.archive import ArchiveRunner
>>> from mailman.testing.helpers import make_testable_runner
>>> runner = make_testable_runner(ArchiveRunner)
>>> runner.run()
diff --git a/src/mailman/queue/docs/command.txt b/src/mailman/runners/docs/command.txt
index c767e6f5f..4bf06b350 100644
--- a/src/mailman/queue/docs/command.txt
+++ b/src/mailman/runners/docs/command.txt
@@ -27,14 +27,14 @@ the sender. The command can be in the ``Subject`` header.
>>> from mailman.inject import inject_message
>>> inject_message(mlist, msg, switchboard='command')
- >>> from mailman.queue.command import CommandRunner
+ >>> from mailman.runners.command import CommandRunner
>>> from mailman.testing.helpers import make_testable_runner
>>> command = make_testable_runner(CommandRunner)
>>> command.run()
And now the response is in the ``virgin`` queue.
- >>> from mailman.queue import Switchboard
+ >>> from mailman.core.switchboard import Switchboard
>>> virgin_queue = config.switchboards['virgin']
>>> len(virgin_queue.files)
1
diff --git a/src/mailman/queue/docs/digester.txt b/src/mailman/runners/docs/digester.txt
index 285b2072a..c8f709a13 100644
--- a/src/mailman/queue/docs/digester.txt
+++ b/src/mailman/runners/docs/digester.txt
@@ -72,7 +72,7 @@ There are 4 messages in the digest.
When the queue runner runs, it processes the digest mailbox, crafting both the
plain text (RFC 1153) digest and the MIME digest.
- >>> from mailman.queue.digest import DigestRunner
+ >>> from mailman.runners.digest import DigestRunner
>>> from mailman.testing.helpers import make_testable_runner
>>> runner = make_testable_runner(DigestRunner)
>>> runner.run()
diff --git a/src/mailman/queue/docs/incoming.txt b/src/mailman/runners/docs/incoming.txt
index 6455db20b..22061044f 100644
--- a/src/mailman/queue/docs/incoming.txt
+++ b/src/mailman/runners/docs/incoming.txt
@@ -59,7 +59,7 @@ mail server normally would.
The incoming queue runner runs until it is empty.
- >>> from mailman.queue.incoming import IncomingRunner
+ >>> from mailman.runners.incoming import IncomingRunner
>>> from mailman.testing.helpers import make_testable_runner
>>> incoming = make_testable_runner(IncomingRunner, 'in')
>>> incoming.run()
diff --git a/src/mailman/queue/docs/lmtp.txt b/src/mailman/runners/docs/lmtp.txt
index c95c6aa2b..c95c6aa2b 100644
--- a/src/mailman/queue/docs/lmtp.txt
+++ b/src/mailman/runners/docs/lmtp.txt
diff --git a/src/mailman/queue/docs/news.txt b/src/mailman/runners/docs/news.txt
index 7261aa333..7b573d39e 100644
--- a/src/mailman/queue/docs/news.txt
+++ b/src/mailman/runners/docs/news.txt
@@ -34,7 +34,7 @@ prohibited headers.
... """)
>>> msgdata = {}
- >>> from mailman.queue.news import prepare_message
+ >>> from mailman.runners.news import prepare_message
>>> prepare_message(mlist, msg, msgdata)
>>> msgdata['prepped']
True
diff --git a/src/mailman/queue/docs/outgoing.txt b/src/mailman/runners/docs/outgoing.txt
index 0af22b808..a69fa36c5 100644
--- a/src/mailman/queue/docs/outgoing.txt
+++ b/src/mailman/runners/docs/outgoing.txt
@@ -62,7 +62,7 @@ destination mailing list name. Simulate that here too.
Running the outgoing queue runner processes the message, delivering it to the
upstream SMTP.
- >>> from mailman.queue.outgoing import OutgoingRunner
+ >>> from mailman.runners.outgoing import OutgoingRunner
>>> from mailman.testing.helpers import make_testable_runner
>>> outgoing = make_testable_runner(OutgoingRunner, 'out')
>>> outgoing.run()
diff --git a/src/mailman/queue/docs/rest.txt b/src/mailman/runners/docs/rest.txt
index 9e8851eca..9e8851eca 100644
--- a/src/mailman/queue/docs/rest.txt
+++ b/src/mailman/runners/docs/rest.txt
diff --git a/src/mailman/queue/incoming.py b/src/mailman/runners/incoming.py
index f8d671177..c1f0d0e64 100644
--- a/src/mailman/queue/incoming.py
+++ b/src/mailman/runners/incoming.py
@@ -38,9 +38,9 @@ from zope.component import getUtility
from mailman.config import config
from mailman.core.chains import process
+from mailman.core.runner import Runner
from mailman.interfaces.address import ExistingAddressError
from mailman.interfaces.usermanager import IUserManager
-from mailman.queue import Runner
diff --git a/src/mailman/queue/lmtp.py b/src/mailman/runners/lmtp.py
index 9163a88e6..32d85b6ca 100644
--- a/src/mailman/queue/lmtp.py
+++ b/src/mailman/runners/lmtp.py
@@ -40,10 +40,10 @@ from email.utils import parseaddr
from zope.component import getUtility
from mailman.config import config
+from mailman.core.runner import Runner
from mailman.database.transaction import txn
from mailman.email.message import Message
from mailman.interfaces.listmanager import IListManager
-from mailman.queue import Runner
elog = logging.getLogger('mailman.error')
qlog = logging.getLogger('mailman.qrunner')
diff --git a/src/mailman/queue/maildir.py b/src/mailman/runners/maildir.py
index 07a89903c..f0204f014 100644
--- a/src/mailman/queue/maildir.py
+++ b/src/mailman/runners/maildir.py
@@ -57,8 +57,9 @@ from email.parser import Parser
from email.utils import parseaddr
from mailman.config import config
+from mailman.core.runner import Runner
+from mailman.core.switchboard import Switchboard
from mailman.message import Message
-from mailman.queue import Runner, Switchboard
log = logging.getLogger('mailman.error')
diff --git a/src/mailman/queue/news.py b/src/mailman/runners/news.py
index a3d915244..0c79cea52 100644
--- a/src/mailman/queue/news.py
+++ b/src/mailman/runners/news.py
@@ -27,8 +27,8 @@ from cStringIO import StringIO
from lazr.config import as_host_port
from mailman.config import config
+from mailman.core.runner import Runner
from mailman.interfaces.nntp import NewsModeration
-from mailman.queue import Runner
COMMASPACE = ', '
log = logging.getLogger('mailman.error')
diff --git a/src/mailman/queue/outgoing.py b/src/mailman/runners/outgoing.py
index ed27f014c..b1c71176b 100644
--- a/src/mailman/queue/outgoing.py
+++ b/src/mailman/runners/outgoing.py
@@ -25,12 +25,12 @@ from lazr.config import as_boolean, as_timedelta
from zope.component import getUtility
from mailman.config import config
+from mailman.core.runner import Runner
from mailman.interfaces.bounce import BounceContext, IBounceProcessor
from mailman.interfaces.mailinglist import Personalization
from mailman.interfaces.membership import ISubscriptionService
from mailman.interfaces.mta import SomeRecipientsFailed
from mailman.interfaces.pending import IPendings
-from mailman.queue import Runner
from mailman.utilities.datetime import now
from mailman.utilities.modules import find_name
diff --git a/src/mailman/queue/pipeline.py b/src/mailman/runners/pipeline.py
index 099ebd032..6b9ad0a88 100644
--- a/src/mailman/queue/pipeline.py
+++ b/src/mailman/runners/pipeline.py
@@ -23,7 +23,7 @@ headers, calculates message recipients, and more.
"""
from mailman.core.pipelines import process
-from mailman.queue import Runner
+from mailman.core.runner import Runner
diff --git a/src/mailman/queue/rest.py b/src/mailman/runners/rest.py
index 31e840a51..1e631c6a3 100644
--- a/src/mailman/queue/rest.py
+++ b/src/mailman/runners/rest.py
@@ -31,7 +31,7 @@ import select
import signal
import logging
-from mailman.queue import Runner
+from mailman.core.runner import Runner
from mailman.rest.wsgiapp import make_server
diff --git a/src/mailman/queue/retry.py b/src/mailman/runners/retry.py
index 24aa7f82b..8b4cfaada 100644
--- a/src/mailman/queue/retry.py
+++ b/src/mailman/runners/retry.py
@@ -28,7 +28,7 @@ __all__ = [
import time
from mailman.config import config
-from mailman.queue import Runner
+from mailman.core.runner import Runner
diff --git a/src/mailman/runners/tests/__init__.py b/src/mailman/runners/tests/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/mailman/runners/tests/__init__.py
diff --git a/src/mailman/queue/tests/test_bounce.py b/src/mailman/runners/tests/test_bounce.py
index 1946df50c..97123d1a4 100644
--- a/src/mailman/queue/tests/test_bounce.py
+++ b/src/mailman/runners/tests/test_bounce.py
@@ -36,7 +36,7 @@ from mailman.interfaces.bounce import (
BounceContext, IBounceProcessor, UnrecognizedBounceDisposition)
from mailman.interfaces.member import MemberRole
from mailman.interfaces.usermanager import IUserManager
-from mailman.queue.bounce import BounceRunner
+from mailman.runners.bounce import BounceRunner
from mailman.testing.helpers import (
LogFileMark,
get_queue_messages,
diff --git a/src/mailman/queue/tests/test_outgoing.py b/src/mailman/runners/tests/test_outgoing.py
index a0fe407c8..0036b4bd4 100644
--- a/src/mailman/queue/tests/test_outgoing.py
+++ b/src/mailman/runners/tests/test_outgoing.py
@@ -44,7 +44,7 @@ from mailman.interfaces.member import MemberRole
from mailman.interfaces.mta import SomeRecipientsFailed
from mailman.interfaces.pending import IPendings
from mailman.interfaces.usermanager import IUserManager
-from mailman.queue.outgoing import OutgoingRunner
+from mailman.runners.outgoing import OutgoingRunner
from mailman.testing.helpers import (
LogFileMark,
get_queue_messages,
@@ -129,7 +129,7 @@ class TestVERPSettings(unittest.TestCase):
# setting of the 'verp' key in the metadata.
config.push('fake outgoing', """
[mta]
- outgoing: mailman.queue.tests.test_outgoing.capture
+ outgoing: mailman.runners.tests.test_outgoing.capture
""")
# Reset the captured data.
captured_mlist = None
@@ -265,7 +265,7 @@ class TestSocketError(unittest.TestCase):
# setting of the 'verp' key in the metadata.
config.push('fake outgoing', """
[mta]
- outgoing: mailman.queue.tests.test_outgoing.raise_socket_error
+ outgoing: mailman.runners.tests.test_outgoing.raise_socket_error
""")
self._mlist = create_list('test@example.com')
self._outq = config.switchboards['out']
@@ -346,8 +346,8 @@ class TestSomeRecipientsFailed(unittest.TestCase):
# We generally don't care what this does, since we're just testing the
# setting of the 'verp' key in the metadata.
config.push('fake outgoing', """
- [mta]
- outgoing: mailman.queue.tests.test_outgoing.raise_SomeRecipientsFailed
+ [mta]
+ outgoing: mailman.runners.tests.test_outgoing.raise_SomeRecipientsFailed
""")
self._mlist = create_list('test@example.com')
self._outq = config.switchboards['out']
diff --git a/src/mailman/queue/virgin.py b/src/mailman/runners/virgin.py
index 2dcdca910..de5788a01 100644
--- a/src/mailman/queue/virgin.py
+++ b/src/mailman/runners/virgin.py
@@ -24,7 +24,7 @@ recipient.
"""
from mailman.core.pipelines import process
-from mailman.queue import Runner
+from mailman.core.runner import Runner