diff options
Diffstat (limited to 'mailman/queue/__init__.py')
| -rw-r--r-- | mailman/queue/__init__.py | 115 |
1 files changed, 67 insertions, 48 deletions
diff --git a/mailman/queue/__init__.py b/mailman/queue/__init__.py index 27ea83320..5aa72bb14 100644 --- a/mailman/queue/__init__.py +++ b/mailman/queue/__init__.py @@ -32,6 +32,7 @@ __all__ = [ import os +import sys import time import email import errno @@ -43,13 +44,16 @@ import marshal import traceback from cStringIO import StringIO +from lazr.config import as_boolean, as_timedelta +from string import Template from zope.interface import implements -from mailman import i18n from mailman import Message from mailman import Utils +from mailman import i18n from mailman.config import config -from mailman.interfaces import IRunner, ISwitchboard +from mailman.interfaces.runner import IRunner +from mailman.interfaces.switchboard import ISwitchboard # 20 bytes of all bits set, maximum hashlib.sha.digest() value shamax = 0xffffffffffffffffffffffffffffffffffffffffL @@ -57,6 +61,7 @@ shamax = 0xffffffffffffffffffffffffffffffffffffffffL # Small increment to add to time in case two entries have the same time. This # prevents skipping one of two entries with the same time until the next pass. DELTA = .0001 +DOT = '.' elog = logging.getLogger('mailman.error') dlog = logging.getLogger('mailman.debug') @@ -66,11 +71,24 @@ dlog = logging.getLogger('mailman.debug') class Switchboard: implements(ISwitchboard) - def __init__(self, whichq, slice=None, numslices=1, recover=False): + @staticmethod + def initialize(): + """Initialize the global switchboards for input/output.""" + for conf in config.qrunner_configs: + name = conf.name.split('.')[-1] + assert name not in config.switchboards, ( + 'Duplicate qrunner name: %s' % name) + substitutions = config.paths + substitutions['name'] = name + path = Template(conf.path).safe_substitute(substitutions) + config.switchboards[name] = Switchboard(path) + + def __init__(self, queue_directory, + slice=None, numslices=1, recover=False): """Create a switchboard object. - :param whichq: The queue directory. - :type whichq: str + :param queue_directory: The queue directory. + :type queue_directory: str :param slice: The slice number for this switchboard, or None. If not None, it must be [0..`numslices`). :type slice: int or None @@ -80,9 +98,11 @@ class Switchboard: :param recover: True if backup files should be recovered. :type recover: bool """ - self._whichq = whichq + assert (numslices & (numslices - 1)) == 0, ( + 'Not a power of 2: %s' % numslices) + self.queue_directory = queue_directory # Create the directory if it doesn't yet exist. - Utils.makedirs(self._whichq, 0770) + Utils.makedirs(self.queue_directory, 0770) # Fast track for no slices self._lower = None self._upper = None @@ -93,11 +113,6 @@ class Switchboard: if recover: self.recover_backup_files() - @property - def queue_directory(self): - """See `ISwitchboard`.""" - return self._whichq - def enqueue(self, _msg, _metadata=None, **_kws): """See `ISwitchboard`.""" if _metadata is None: @@ -125,7 +140,7 @@ class Switchboard: # and the sha hex digest. rcvtime = data.setdefault('received_time', now) filebase = repr(rcvtime) + '+' + hashlib.sha1(hashfood).hexdigest() - filename = os.path.join(self._whichq, filebase + '.pck') + filename = os.path.join(self.queue_directory, filebase + '.pck') tmpfile = filename + '.tmp' # Always add the metadata schema version number data['version'] = config.QFILE_SCHEMA_VERSION @@ -148,8 +163,8 @@ class Switchboard: def dequeue(self, filebase): """See `ISwitchboard`.""" # Calculate the filename from the given filebase. - filename = os.path.join(self._whichq, filebase + '.pck') - backfile = os.path.join(self._whichq, filebase + '.bak') + filename = os.path.join(self.queue_directory, filebase + '.pck') + backfile = os.path.join(self.queue_directory, filebase + '.bak') # Read the message object and metadata. with open(filename) as fp: # Move the file to the backup file name for processing. If this @@ -172,13 +187,13 @@ class Switchboard: return msg, data def finish(self, filebase, preserve=False): - bakfile = os.path.join(self._whichq, filebase + '.bak') + bakfile = os.path.join(self.queue_directory, filebase + '.bak') try: if preserve: - psvfile = os.path.join(config.SHUNTQUEUE_DIR, - filebase + '.psv') + shunt_dir = config.switchboards['shunt'].queue_directory + psvfile = os.path.join(shunt_dir, filebase + '.psv') # Create the directory if it doesn't yet exist. - Utils.makedirs(config.SHUNTQUEUE_DIR, 0770) + Utils.makedirs(shunt_dir, 0770) os.rename(bakfile, psvfile) else: os.unlink(bakfile) @@ -196,7 +211,7 @@ class Switchboard: times = {} lower = self._lower upper = self._upper - for f in os.listdir(self._whichq): + for f in os.listdir(self.queue_directory): # By ignoring anything that doesn't end in .pck, we ignore # tempfiles and avoid a race condition. filebase, ext = os.path.splitext(f) @@ -220,8 +235,8 @@ class Switchboard: # to exist at the same time, so the move is enough to ensure that our # normal dequeuing process will handle them. for filebase in self.get_files('.bak'): - src = os.path.join(self._whichq, filebase + '.bak') - dst = os.path.join(self._whichq, filebase + '.pck') + src = os.path.join(self.queue_directory, filebase + '.bak') + dst = os.path.join(self.queue_directory, filebase + '.pck') os.rename(src, dst) @@ -229,27 +244,31 @@ class Switchboard: class Runner: implements(IRunner) - QDIR = None - SLEEPTIME = None - - def __init__(self, slice=None, numslices=1): + def __init__(self, name, slice=None): """Create a queue runner. :param slice: The slice number for this queue runner. This is passed directly to the underlying `ISwitchboard` object. :type slice: int or None - :param numslices: The number of slices for this queue. Must be a - power of 2. - :type numslices: int """ - # Create our own switchboard. Don't use the switchboard cache because - # we want to provide slice and numslice arguments. - self._switchboard = Switchboard(self.QDIR, slice, numslices, True) - # Create the shunt switchboard - self._shunt = Switchboard(config.SHUNTQUEUE_DIR) + # Grab the configuration section. + self.name = name + section = getattr(config, 'qrunner.' + name) + substitutions = config.paths + substitutions['name'] = name + self.queue_directory = Template(section.path).safe_substitute( + substitutions) + numslices = int(section.instances) + self.switchboard = Switchboard( + self.queue_directory, slice, numslices, True) + self.sleep_time = as_timedelta(section.sleep_time) + # sleep_time is a timedelta; turn it into a float for time.sleep(). + self.sleep_float = (86400 * self.sleep_time.days + + self.sleep_time.seconds + + self.sleep_time.microseconds / 1000000.0) + self.max_restarts = int(section.max_restarts) + self.start = as_boolean(section.start) self._stop = False - if self.SLEEPTIME is None: - self.SLEEPTIME = config.QRUNNER_SLEEP_TIME def __repr__(self): return '<%s at %s>' % (self.__class__.__name__, id(self)) @@ -286,13 +305,13 @@ class Runner: dlog.debug('[%s] starting oneloop', me) # List all the files in our queue directory. The switchboard is # guaranteed to hand us the files in FIFO order. - files = self._switchboard.files + files = self.switchboard.files for filebase in files: dlog.debug('[%s] processing filebase: %s', me, filebase) try: # Ask the switchboard for the message and metadata objects # associated with this queue file. - msg, msgdata = self._switchboard.dequeue(filebase) + msg, msgdata = self.switchboard.dequeue(filebase) except Exception, e: # This used to just catch email.Errors.MessageParseError, but # other problems can occur in message parsing, e.g. @@ -302,14 +321,14 @@ class Runner: self._log(e) elog.error('Skipping and preserving unparseable message: %s', filebase) - self._switchboard.finish(filebase, preserve=True) + self.switchboard.finish(filebase, preserve=True) config.db.abort() continue try: dlog.debug('[%s] processing onefile', me) self._process_one_file(msg, msgdata) dlog.debug('[%s] finishing filebase: %s', me, filebase) - self._switchboard.finish(filebase) + self.switchboard.finish(filebase) except Exception, e: # All runners that implement _dispose() must guarantee that # exceptions are caught and dealt with properly. Still, there @@ -319,14 +338,14 @@ class Runner: # intervention. self._log(e) # Put a marker in the metadata for unshunting. - msgdata['whichq'] = self._switchboard.queue_directory + msgdata['whichq'] = self.switchboard.queue_directory # It is possible that shunting can throw an exception, e.g. a # permissions problem or a MemoryError due to a really large # message. Try to be graceful. try: new_filebase = self._shunt.enqueue(msg, msgdata) elog.error('SHUNTING: %s', new_filebase) - self._switchboard.finish(filebase) + self.switchboard.finish(filebase) except Exception, e: # The message wasn't successfully shunted. Log the # exception and try to preserve the original queue entry @@ -335,13 +354,13 @@ class Runner: elog.error( 'SHUNTING FAILED, preserving original entry: %s', filebase) - self._switchboard.finish(filebase, preserve=True) + self.switchboard.finish(filebase, preserve=True) config.db.abort() # Other work we want to do each time through the loop. dlog.debug('[%s] doing periodic', me) self._do_periodic() dlog.debug('[%s] checking short circuit', me) - if self._short_curcuit(): + if self._short_circuit(): dlog.debug('[%s] short circuiting', me) break dlog.debug('[%s] commiting', me) @@ -380,7 +399,7 @@ class Runner: msgdata['lang'] = language keepqueued = self._dispose(mlist, msg, msgdata) if keepqueued: - self._switchboard.enqueue(msg, msgdata) + self.switchboard.enqueue(msg, msgdata) def _log(self, exc): elog.error('Uncaught runner exception: %s', exc) @@ -401,10 +420,10 @@ class Runner: def _snooze(self, filecnt): """See `IRunner`.""" - if filecnt or float(self.SLEEPTIME) <= 0: + if filecnt or self.sleep_float <= 0: return - time.sleep(float(self.SLEEPTIME)) + time.sleep(self.sleep_float) - def _short_curcuit(self): + def _short_circuit(self): """See `IRunner`.""" return self._stop |
