summaryrefslogtreecommitdiff
path: root/mailman/queue/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'mailman/queue/__init__.py')
-rw-r--r--mailman/queue/__init__.py115
1 files changed, 67 insertions, 48 deletions
diff --git a/mailman/queue/__init__.py b/mailman/queue/__init__.py
index 27ea83320..5aa72bb14 100644
--- a/mailman/queue/__init__.py
+++ b/mailman/queue/__init__.py
@@ -32,6 +32,7 @@ __all__ = [
import os
+import sys
import time
import email
import errno
@@ -43,13 +44,16 @@ import marshal
import traceback
from cStringIO import StringIO
+from lazr.config import as_boolean, as_timedelta
+from string import Template
from zope.interface import implements
-from mailman import i18n
from mailman import Message
from mailman import Utils
+from mailman import i18n
from mailman.config import config
-from mailman.interfaces import IRunner, ISwitchboard
+from mailman.interfaces.runner import IRunner
+from mailman.interfaces.switchboard import ISwitchboard
# 20 bytes of all bits set, maximum hashlib.sha.digest() value
shamax = 0xffffffffffffffffffffffffffffffffffffffffL
@@ -57,6 +61,7 @@ shamax = 0xffffffffffffffffffffffffffffffffffffffffL
# Small increment to add to time in case two entries have the same time. This
# prevents skipping one of two entries with the same time until the next pass.
DELTA = .0001
+DOT = '.'
elog = logging.getLogger('mailman.error')
dlog = logging.getLogger('mailman.debug')
@@ -66,11 +71,24 @@ dlog = logging.getLogger('mailman.debug')
class Switchboard:
implements(ISwitchboard)
- def __init__(self, whichq, slice=None, numslices=1, recover=False):
+ @staticmethod
+ def initialize():
+ """Initialize the global switchboards for input/output."""
+ for conf in config.qrunner_configs:
+ name = conf.name.split('.')[-1]
+ assert name not in config.switchboards, (
+ 'Duplicate qrunner name: %s' % name)
+ substitutions = config.paths
+ substitutions['name'] = name
+ path = Template(conf.path).safe_substitute(substitutions)
+ config.switchboards[name] = Switchboard(path)
+
+ def __init__(self, queue_directory,
+ slice=None, numslices=1, recover=False):
"""Create a switchboard object.
- :param whichq: The queue directory.
- :type whichq: str
+ :param queue_directory: The queue directory.
+ :type queue_directory: str
:param slice: The slice number for this switchboard, or None. If not
None, it must be [0..`numslices`).
:type slice: int or None
@@ -80,9 +98,11 @@ class Switchboard:
:param recover: True if backup files should be recovered.
:type recover: bool
"""
- self._whichq = whichq
+ assert (numslices & (numslices - 1)) == 0, (
+ 'Not a power of 2: %s' % numslices)
+ self.queue_directory = queue_directory
# Create the directory if it doesn't yet exist.
- Utils.makedirs(self._whichq, 0770)
+ Utils.makedirs(self.queue_directory, 0770)
# Fast track for no slices
self._lower = None
self._upper = None
@@ -93,11 +113,6 @@ class Switchboard:
if recover:
self.recover_backup_files()
- @property
- def queue_directory(self):
- """See `ISwitchboard`."""
- return self._whichq
-
def enqueue(self, _msg, _metadata=None, **_kws):
"""See `ISwitchboard`."""
if _metadata is None:
@@ -125,7 +140,7 @@ class Switchboard:
# and the sha hex digest.
rcvtime = data.setdefault('received_time', now)
filebase = repr(rcvtime) + '+' + hashlib.sha1(hashfood).hexdigest()
- filename = os.path.join(self._whichq, filebase + '.pck')
+ filename = os.path.join(self.queue_directory, filebase + '.pck')
tmpfile = filename + '.tmp'
# Always add the metadata schema version number
data['version'] = config.QFILE_SCHEMA_VERSION
@@ -148,8 +163,8 @@ class Switchboard:
def dequeue(self, filebase):
"""See `ISwitchboard`."""
# Calculate the filename from the given filebase.
- filename = os.path.join(self._whichq, filebase + '.pck')
- backfile = os.path.join(self._whichq, filebase + '.bak')
+ filename = os.path.join(self.queue_directory, filebase + '.pck')
+ backfile = os.path.join(self.queue_directory, filebase + '.bak')
# Read the message object and metadata.
with open(filename) as fp:
# Move the file to the backup file name for processing. If this
@@ -172,13 +187,13 @@ class Switchboard:
return msg, data
def finish(self, filebase, preserve=False):
- bakfile = os.path.join(self._whichq, filebase + '.bak')
+ bakfile = os.path.join(self.queue_directory, filebase + '.bak')
try:
if preserve:
- psvfile = os.path.join(config.SHUNTQUEUE_DIR,
- filebase + '.psv')
+ shunt_dir = config.switchboards['shunt'].queue_directory
+ psvfile = os.path.join(shunt_dir, filebase + '.psv')
# Create the directory if it doesn't yet exist.
- Utils.makedirs(config.SHUNTQUEUE_DIR, 0770)
+ Utils.makedirs(shunt_dir, 0770)
os.rename(bakfile, psvfile)
else:
os.unlink(bakfile)
@@ -196,7 +211,7 @@ class Switchboard:
times = {}
lower = self._lower
upper = self._upper
- for f in os.listdir(self._whichq):
+ for f in os.listdir(self.queue_directory):
# By ignoring anything that doesn't end in .pck, we ignore
# tempfiles and avoid a race condition.
filebase, ext = os.path.splitext(f)
@@ -220,8 +235,8 @@ class Switchboard:
# to exist at the same time, so the move is enough to ensure that our
# normal dequeuing process will handle them.
for filebase in self.get_files('.bak'):
- src = os.path.join(self._whichq, filebase + '.bak')
- dst = os.path.join(self._whichq, filebase + '.pck')
+ src = os.path.join(self.queue_directory, filebase + '.bak')
+ dst = os.path.join(self.queue_directory, filebase + '.pck')
os.rename(src, dst)
@@ -229,27 +244,31 @@ class Switchboard:
class Runner:
implements(IRunner)
- QDIR = None
- SLEEPTIME = None
-
- def __init__(self, slice=None, numslices=1):
+ def __init__(self, name, slice=None):
"""Create a queue runner.
:param slice: The slice number for this queue runner. This is passed
directly to the underlying `ISwitchboard` object.
:type slice: int or None
- :param numslices: The number of slices for this queue. Must be a
- power of 2.
- :type numslices: int
"""
- # Create our own switchboard. Don't use the switchboard cache because
- # we want to provide slice and numslice arguments.
- self._switchboard = Switchboard(self.QDIR, slice, numslices, True)
- # Create the shunt switchboard
- self._shunt = Switchboard(config.SHUNTQUEUE_DIR)
+ # Grab the configuration section.
+ self.name = name
+ section = getattr(config, 'qrunner.' + name)
+ substitutions = config.paths
+ substitutions['name'] = name
+ self.queue_directory = Template(section.path).safe_substitute(
+ substitutions)
+ numslices = int(section.instances)
+ self.switchboard = Switchboard(
+ self.queue_directory, slice, numslices, True)
+ self.sleep_time = as_timedelta(section.sleep_time)
+ # sleep_time is a timedelta; turn it into a float for time.sleep().
+ self.sleep_float = (86400 * self.sleep_time.days +
+ self.sleep_time.seconds +
+ self.sleep_time.microseconds / 1000000.0)
+ self.max_restarts = int(section.max_restarts)
+ self.start = as_boolean(section.start)
self._stop = False
- if self.SLEEPTIME is None:
- self.SLEEPTIME = config.QRUNNER_SLEEP_TIME
def __repr__(self):
return '<%s at %s>' % (self.__class__.__name__, id(self))
@@ -286,13 +305,13 @@ class Runner:
dlog.debug('[%s] starting oneloop', me)
# List all the files in our queue directory. The switchboard is
# guaranteed to hand us the files in FIFO order.
- files = self._switchboard.files
+ files = self.switchboard.files
for filebase in files:
dlog.debug('[%s] processing filebase: %s', me, filebase)
try:
# Ask the switchboard for the message and metadata objects
# associated with this queue file.
- msg, msgdata = self._switchboard.dequeue(filebase)
+ msg, msgdata = self.switchboard.dequeue(filebase)
except Exception, e:
# This used to just catch email.Errors.MessageParseError, but
# other problems can occur in message parsing, e.g.
@@ -302,14 +321,14 @@ class Runner:
self._log(e)
elog.error('Skipping and preserving unparseable message: %s',
filebase)
- self._switchboard.finish(filebase, preserve=True)
+ self.switchboard.finish(filebase, preserve=True)
config.db.abort()
continue
try:
dlog.debug('[%s] processing onefile', me)
self._process_one_file(msg, msgdata)
dlog.debug('[%s] finishing filebase: %s', me, filebase)
- self._switchboard.finish(filebase)
+ self.switchboard.finish(filebase)
except Exception, e:
# All runners that implement _dispose() must guarantee that
# exceptions are caught and dealt with properly. Still, there
@@ -319,14 +338,14 @@ class Runner:
# intervention.
self._log(e)
# Put a marker in the metadata for unshunting.
- msgdata['whichq'] = self._switchboard.queue_directory
+ msgdata['whichq'] = self.switchboard.queue_directory
# It is possible that shunting can throw an exception, e.g. a
# permissions problem or a MemoryError due to a really large
# message. Try to be graceful.
try:
new_filebase = self._shunt.enqueue(msg, msgdata)
elog.error('SHUNTING: %s', new_filebase)
- self._switchboard.finish(filebase)
+ self.switchboard.finish(filebase)
except Exception, e:
# The message wasn't successfully shunted. Log the
# exception and try to preserve the original queue entry
@@ -335,13 +354,13 @@ class Runner:
elog.error(
'SHUNTING FAILED, preserving original entry: %s',
filebase)
- self._switchboard.finish(filebase, preserve=True)
+ self.switchboard.finish(filebase, preserve=True)
config.db.abort()
# Other work we want to do each time through the loop.
dlog.debug('[%s] doing periodic', me)
self._do_periodic()
dlog.debug('[%s] checking short circuit', me)
- if self._short_curcuit():
+ if self._short_circuit():
dlog.debug('[%s] short circuiting', me)
break
dlog.debug('[%s] commiting', me)
@@ -380,7 +399,7 @@ class Runner:
msgdata['lang'] = language
keepqueued = self._dispose(mlist, msg, msgdata)
if keepqueued:
- self._switchboard.enqueue(msg, msgdata)
+ self.switchboard.enqueue(msg, msgdata)
def _log(self, exc):
elog.error('Uncaught runner exception: %s', exc)
@@ -401,10 +420,10 @@ class Runner:
def _snooze(self, filecnt):
"""See `IRunner`."""
- if filecnt or float(self.SLEEPTIME) <= 0:
+ if filecnt or self.sleep_float <= 0:
return
- time.sleep(float(self.SLEEPTIME))
+ time.sleep(self.sleep_float)
- def _short_curcuit(self):
+ def _short_circuit(self):
"""See `IRunner`."""
return self._stop