summaryrefslogtreecommitdiff
path: root/mailman/queue/__init__.py
diff options
context:
space:
mode:
authorBarry Warsaw2009-01-25 13:01:41 -0500
committerBarry Warsaw2009-01-25 13:01:41 -0500
commiteefd06f1b88b8ecbb23a9013cd223b72ca85c20d (patch)
tree72c947fe16fce0e07e996ee74020b26585d7e846 /mailman/queue/__init__.py
parent07871212f74498abd56bef3919bf3e029eb8b930 (diff)
downloadmailman-eefd06f1b88b8ecbb23a9013cd223b72ca85c20d.tar.gz
mailman-eefd06f1b88b8ecbb23a9013cd223b72ca85c20d.tar.zst
mailman-eefd06f1b88b8ecbb23a9013cd223b72ca85c20d.zip
Push the source directory into a 'src' subdirectory so that zc.buildout works
correctly regardless of how it's used.
Diffstat (limited to 'mailman/queue/__init__.py')
-rw-r--r--mailman/queue/__init__.py466
1 files changed, 0 insertions, 466 deletions
diff --git a/mailman/queue/__init__.py b/mailman/queue/__init__.py
deleted file mode 100644
index 6094bda9e..000000000
--- a/mailman/queue/__init__.py
+++ /dev/null
@@ -1,466 +0,0 @@
-# Copyright (C) 2001-2009 by the Free Software Foundation, Inc.
-#
-# This file is part of GNU Mailman.
-#
-# GNU Mailman is free software: you can redistribute it and/or modify it under
-# the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 3 of the License, or (at your option)
-# any later version.
-#
-# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
-# more details.
-#
-# You should have received a copy of the GNU General Public License along with
-# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
-
-"""Queing and dequeuing message/metadata pickle files.
-
-Messages are represented as email.message.Message objects (or an instance ofa
-subclass). Metadata is represented as a Python dictionary. For every
-message/metadata pair in a queue, a single file containing two pickles is
-written. First, the message is written to the pickle, then the metadata
-dictionary is written.
-"""
-
-from __future__ import absolute_import, unicode_literals
-
-__metaclass__ = type
-__all__ = [
- 'Runner',
- 'Switchboard',
- ]
-
-
-import os
-import sys
-import time
-import email
-import errno
-import pickle
-import cPickle
-import hashlib
-import logging
-import marshal
-import traceback
-
-from cStringIO import StringIO
-from lazr.config import as_boolean, as_timedelta
-from zope.interface import implements
-
-from mailman import Message
-from mailman import i18n
-from mailman.config import config
-from mailman.interfaces.runner import IRunner
-from mailman.interfaces.switchboard import ISwitchboard
-from mailman.utilities.filesystem import makedirs
-from mailman.utilities.string import expand
-
-
-# 20 bytes of all bits set, maximum hashlib.sha.digest() value
-shamax = 0xffffffffffffffffffffffffffffffffffffffffL
-
-# Small increment to add to time in case two entries have the same time. This
-# prevents skipping one of two entries with the same time until the next pass.
-DELTA = .0001
-DOT = '.'
-# We count the number of times a file has been moved to .bak and recovered.
-# In order to prevent loops and a message flood, when the count reaches this
-# value, we move the file to the bad queue as a .psv.
-MAX_BAK_COUNT = 3
-
-elog = logging.getLogger('mailman.error')
-dlog = logging.getLogger('mailman.debug')
-
-
-
-class Switchboard:
- implements(ISwitchboard)
-
- @staticmethod
- def initialize():
- """Initialize the global switchboards for input/output."""
- for conf in config.qrunner_configs:
- name = conf.name.split('.')[-1]
- assert name not in config.switchboards, (
- 'Duplicate qrunner name: {0}'.format(name))
- substitutions = config.paths
- substitutions['name'] = name
- path = expand(conf.path, substitutions)
- config.switchboards[name] = Switchboard(path)
-
- def __init__(self, queue_directory,
- slice=None, numslices=1, recover=False):
- """Create a switchboard object.
-
- :param queue_directory: The queue directory.
- :type queue_directory: str
- :param slice: The slice number for this switchboard, or None. If not
- None, it must be [0..`numslices`).
- :type slice: int or None
- :param numslices: The total number of slices to split this queue
- directory into. It must be a power of 2.
- :type numslices: int
- :param recover: True if backup files should be recovered.
- :type recover: bool
- """
- assert (numslices & (numslices - 1)) == 0, (
- 'Not a power of 2: {0}'.format(numslices))
- self.queue_directory = queue_directory
- # Create the directory if it doesn't yet exist.
- makedirs(self.queue_directory, 0770)
- # Fast track for no slices
- self._lower = None
- self._upper = None
- # BAW: test performance and end-cases of this algorithm
- if numslices <> 1:
- self._lower = ((shamax + 1) * slice) / numslices
- self._upper = (((shamax + 1) * (slice + 1)) / numslices) - 1
- if recover:
- self.recover_backup_files()
-
- def enqueue(self, _msg, _metadata=None, **_kws):
- """See `ISwitchboard`."""
- if _metadata is None:
- _metadata = {}
- # Calculate the SHA hexdigest of the message to get a unique base
- # filename. We're also going to use the digest as a hash into the set
- # of parallel qrunner processes.
- data = _metadata.copy()
- data.update(_kws)
- listname = data.get('listname', '--nolist--')
- # Get some data for the input to the sha hash.
- now = time.time()
- if data.get('_plaintext'):
- protocol = 0
- msgsave = cPickle.dumps(str(_msg), protocol)
- else:
- protocol = pickle.HIGHEST_PROTOCOL
- msgsave = cPickle.dumps(_msg, protocol)
- # listname is unicode but the input to the hash function must be an
- # 8-bit string (eventually, a bytes object).
- hashfood = msgsave + listname.encode('utf-8') + repr(now)
- # Encode the current time into the file name for FIFO sorting. The
- # file name consists of two parts separated by a '+': the received
- # time for this message (i.e. when it first showed up on this system)
- # and the sha hex digest.
- rcvtime = data.setdefault('received_time', now)
- filebase = repr(rcvtime) + '+' + hashlib.sha1(hashfood).hexdigest()
- filename = os.path.join(self.queue_directory, filebase + '.pck')
- tmpfile = filename + '.tmp'
- # Always add the metadata schema version number
- data['version'] = config.QFILE_SCHEMA_VERSION
- # Filter out volatile entries. Use .keys() so that we can mutate the
- # dictionary during the iteration.
- for k in data.keys():
- if k.startswith('_'):
- del data[k]
- # We have to tell the dequeue() method whether to parse the message
- # object or not.
- data['_parsemsg'] = (protocol == 0)
- # Write to the pickle file the message object and metadata.
- with open(tmpfile, 'w') as fp:
- fp.write(msgsave)
- cPickle.dump(data, fp, protocol)
- fp.flush()
- os.fsync(fp.fileno())
- os.rename(tmpfile, filename)
- return filebase
-
- def dequeue(self, filebase):
- """See `ISwitchboard`."""
- # Calculate the filename from the given filebase.
- filename = os.path.join(self.queue_directory, filebase + '.pck')
- backfile = os.path.join(self.queue_directory, filebase + '.bak')
- # Read the message object and metadata.
- with open(filename) as fp:
- # Move the file to the backup file name for processing. If this
- # process crashes uncleanly the .bak file will be used to
- # re-instate the .pck file in order to try again.
- os.rename(filename, backfile)
- msg = cPickle.load(fp)
- data = cPickle.load(fp)
- if data.get('_parsemsg'):
- # Calculate the original size of the text now so that we won't
- # have to generate the message later when we do size restriction
- # checking.
- original_size = len(msg)
- msg = email.message_from_string(msg, Message.Message)
- msg.original_size = original_size
- data['original_size'] = original_size
- return msg, data
-
- def finish(self, filebase, preserve=False):
- """See `ISwitchboard`."""
- bakfile = os.path.join(self.queue_directory, filebase + '.bak')
- try:
- if preserve:
- bad_dir = config.switchboards['bad'].queue_directory
- psvfile = os.path.join(bad_dir, filebase + '.psv')
- os.rename(bakfile, psvfile)
- else:
- os.unlink(bakfile)
- except EnvironmentError:
- elog.exception(
- 'Failed to unlink/preserve backup file: %s', bakfile)
-
- @property
- def files(self):
- """See `ISwitchboard`."""
- return self.get_files()
-
- def get_files(self, extension='.pck'):
- """See `ISwitchboard`."""
- times = {}
- lower = self._lower
- upper = self._upper
- for f in os.listdir(self.queue_directory):
- # By ignoring anything that doesn't end in .pck, we ignore
- # tempfiles and avoid a race condition.
- filebase, ext = os.path.splitext(f)
- if ext <> extension:
- continue
- when, digest = filebase.split('+', 1)
- # Throw out any files which don't match our bitrange. BAW: test
- # performance and end-cases of this algorithm. MAS: both
- # comparisons need to be <= to get complete range.
- if lower is None or (lower <= long(digest, 16) <= upper):
- key = float(when)
- while key in times:
- key += DELTA
- times[key] = filebase
- # FIFO sort
- return [times[key] for key in sorted(times)]
-
- def recover_backup_files(self):
- """See `ISwitchboard`."""
- # Move all .bak files in our slice to .pck. It's impossible for both
- # to exist at the same time, so the move is enough to ensure that our
- # normal dequeuing process will handle them. We keep count in
- # _bak_count in the metadata of the number of times we recover this
- # file. When the count reaches MAX_BAK_COUNT, we move the .bak file
- # to a .psv file in the bad queue.
- for filebase in self.get_files('.bak'):
- src = os.path.join(self.queue_directory, filebase + '.bak')
- dst = os.path.join(self.queue_directory, filebase + '.pck')
- fp = open(src, 'rb+')
- try:
- try:
- msg = cPickle.load(fp)
- data_pos = fp.tell()
- data = cPickle.load(fp)
- except Exception as error:
- # If unpickling throws any exception, just log and
- # preserve this entry
- elog.error('Unpickling .bak exception: %s\n'
- 'Preserving file: %s', error, filebase)
- self.finish(filebase, preserve=True)
- else:
- data['_bak_count'] = data.get('_bak_count', 0) + 1
- fp.seek(data_pos)
- if data.get('_parsemsg'):
- protocol = 0
- else:
- protocol = 1
- cPickle.dump(data, fp, protocol)
- fp.truncate()
- fp.flush()
- os.fsync(fp.fileno())
- if data['_bak_count'] >= MAX_BAK_COUNT:
- elog.error('.bak file max count, preserving file: %s',
- filebase)
- self.finish(filebase, preserve=True)
- else:
- os.rename(src, dst)
- finally:
- fp.close()
-
-
-
-class Runner:
- implements(IRunner)
-
- def __init__(self, name, slice=None):
- """Create a queue runner.
-
- :param slice: The slice number for this queue runner. This is passed
- directly to the underlying `ISwitchboard` object.
- :type slice: int or None
- """
- # Grab the configuration section.
- self.name = name
- section = getattr(config, 'qrunner.' + name)
- substitutions = config.paths
- substitutions['name'] = name
- self.queue_directory = expand(section.path, substitutions)
- numslices = int(section.instances)
- self.switchboard = Switchboard(
- self.queue_directory, slice, numslices, True)
- self.sleep_time = as_timedelta(section.sleep_time)
- # sleep_time is a timedelta; turn it into a float for time.sleep().
- self.sleep_float = (86400 * self.sleep_time.days +
- self.sleep_time.seconds +
- self.sleep_time.microseconds / 1.0e6)
- self.max_restarts = int(section.max_restarts)
- self.start = as_boolean(section.start)
- self._stop = False
-
- def __repr__(self):
- return '<{0} at {1:#x}>'.format(self.__class__.__name__, id(self))
-
- def stop(self):
- """See `IRunner`."""
- self._stop = True
-
- def run(self):
- """See `IRunner`."""
- # Start the main loop for this queue runner.
- try:
- while True:
- # Once through the loop that processes all the files in the
- # queue directory.
- filecnt = self._one_iteration()
- # Do the periodic work for the subclass.
- self._do_periodic()
- # If the stop flag is set, we're done.
- if self._stop:
- break
- # Give the runner an opportunity to snooze for a while, but
- # pass it the file count so it can decide whether to do more
- # work now or not.
- self._snooze(filecnt)
- except KeyboardInterrupt:
- pass
- finally:
- self._clean_up()
-
- def _one_iteration(self):
- """See `IRunner`."""
- me = self.__class__.__name__
- dlog.debug('[%s] starting oneloop', me)
- # List all the files in our queue directory. The switchboard is
- # guaranteed to hand us the files in FIFO order.
- files = self.switchboard.files
- for filebase in files:
- dlog.debug('[%s] processing filebase: %s', me, filebase)
- try:
- # Ask the switchboard for the message and metadata objects
- # associated with this queue file.
- msg, msgdata = self.switchboard.dequeue(filebase)
- except Exception as error:
- # This used to just catch email.Errors.MessageParseError, but
- # other problems can occur in message parsing, e.g.
- # ValueError, and exceptions can occur in unpickling too. We
- # don't want the runner to die, so we just log and skip this
- # entry, but preserve it for analysis.
- self._log(error)
- elog.error('Skipping and preserving unparseable message: %s',
- filebase)
- self.switchboard.finish(filebase, preserve=True)
- config.db.abort()
- continue
- try:
- dlog.debug('[%s] processing onefile', me)
- self._process_one_file(msg, msgdata)
- dlog.debug('[%s] finishing filebase: %s', me, filebase)
- self.switchboard.finish(filebase)
- except Exception as error:
- # All runners that implement _dispose() must guarantee that
- # exceptions are caught and dealt with properly. Still, there
- # may be a bug in the infrastructure, and we do not want those
- # to cause messages to be lost. Any uncaught exceptions will
- # cause the message to be stored in the shunt queue for human
- # intervention.
- self._log(error)
- # Put a marker in the metadata for unshunting.
- msgdata['whichq'] = self.switchboard.queue_directory
- # It is possible that shunting can throw an exception, e.g. a
- # permissions problem or a MemoryError due to a really large
- # message. Try to be graceful.
- try:
- shunt = config.switchboards['shunt']
- new_filebase = shunt.enqueue(msg, msgdata)
- elog.error('SHUNTING: %s', new_filebase)
- self.switchboard.finish(filebase)
- except Exception as error:
- # The message wasn't successfully shunted. Log the
- # exception and try to preserve the original queue entry
- # for possible analysis.
- self._log(error)
- elog.error(
- 'SHUNTING FAILED, preserving original entry: %s',
- filebase)
- self.switchboard.finish(filebase, preserve=True)
- config.db.abort()
- # Other work we want to do each time through the loop.
- dlog.debug('[%s] doing periodic', me)
- self._do_periodic()
- dlog.debug('[%s] checking short circuit', me)
- if self._short_circuit():
- dlog.debug('[%s] short circuiting', me)
- break
- dlog.debug('[%s] commiting', me)
- config.db.commit()
- dlog.debug('[%s] ending oneloop: %s', me, len(files))
- return len(files)
-
- def _process_one_file(self, msg, msgdata):
- """See `IRunner`."""
- # Do some common sanity checking on the message metadata. It's got to
- # be destined for a particular mailing list. This switchboard is used
- # to shunt off badly formatted messages. We don't want to just trash
- # them because they may be fixable with human intervention. Just get
- # them out of our sight.
- #
- # Find out which mailing list this message is destined for.
- listname = unicode(msgdata.get('listname'))
- mlist = config.db.list_manager.get(listname)
- if mlist is None:
- elog.error('Dequeuing message destined for missing list: %s',
- listname)
- config.switchboards['shunt'].enqueue(msg, msgdata)
- return
- # Now process this message. We also want to set up the language
- # context for this message. The context will be the preferred
- # language for the user if the sender is a member of the list, or it
- # will be the list's preferred language. However, we must take
- # special care to reset the defaults, otherwise subsequent messages
- # may be translated incorrectly.
- sender = msg.get_sender()
- member = mlist.members.get_member(sender)
- language = (member.preferred_language
- if member is not None
- else mlist.preferred_language)
- with i18n.using_language(language):
- msgdata['lang'] = language
- keepqueued = self._dispose(mlist, msg, msgdata)
- if keepqueued:
- self.switchboard.enqueue(msg, msgdata)
-
- def _log(self, exc):
- elog.error('Uncaught runner exception: %s', exc)
- s = StringIO()
- traceback.print_exc(file=s)
- elog.error('%s', s.getvalue())
-
- def _clean_up(self):
- """See `IRunner`."""
-
- def _dispose(self, mlist, msg, msgdata):
- """See `IRunner`."""
- raise NotImplementedError
-
- def _do_periodic(self):
- """See `IRunner`."""
- pass
-
- def _snooze(self, filecnt):
- """See `IRunner`."""
- if filecnt or self.sleep_float <= 0:
- return
- time.sleep(self.sleep_float)
-
- def _short_circuit(self):
- """See `IRunner`."""
- return self._stop