summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBarry Warsaw2008-02-27 22:22:09 -0500
committerBarry Warsaw2008-02-27 22:22:09 -0500
commit3f31f8cce369529d177cfb5a7c66346ec1e12130 (patch)
tree15f9c0a2cde40ea4aa03e18e1cfd1852b0c72916
parentf0c044111dfdf6ffe3531df18ccf268a4056874b (diff)
downloadmailman-3f31f8cce369529d177cfb5a7c66346ec1e12130.tar.gz
mailman-3f31f8cce369529d177cfb5a7c66346ec1e12130.tar.zst
mailman-3f31f8cce369529d177cfb5a7c66346ec1e12130.zip
-rw-r--r--Mailman/bin/docs/mailmanctl.txt27
-rw-r--r--Mailman/bin/docs/master.txt47
-rw-r--r--Mailman/bin/master.py213
-rw-r--r--Mailman/tests/helpers.py98
4 files changed, 193 insertions, 192 deletions
diff --git a/Mailman/bin/docs/mailmanctl.txt b/Mailman/bin/docs/mailmanctl.txt
deleted file mode 100644
index bd9023371..000000000
--- a/Mailman/bin/docs/mailmanctl.txt
+++ /dev/null
@@ -1,27 +0,0 @@
-Mailman queue runner control
-============================
-
-Mailman has a number of queue runners which process messages in its queue file
-directories. In normal operation, a command line script called 'mailmanctl'
-is used to start, stop and manage the queue runners. mailmanctl actually is
-just a wrapper around the real queue runner watcher script called master.py.
-
-Because master.py runs in the foreground, we can't start it directly, so we'll
-start it via mailmanctl.
-
- >>> from Mailman.tests.helpers import Watcher
- >>> watcher = Watcher()
- >>> watcher.start()
-
- >>> import os
-
- # This will raise an exception if the process doesn't exist.
- >>> os.kill(watcher.pid, 0)
-
-It's also easy to stop the queue runners via the mailmanctl program.
-
- >>> watcher.stop()
- >>> os.kill(watcher.pid, 0)
- Traceback (most recent call last):
- ...
- OSError: [Errno ...] No such process
diff --git a/Mailman/bin/docs/master.txt b/Mailman/bin/docs/master.txt
new file mode 100644
index 000000000..d1df43d86
--- /dev/null
+++ b/Mailman/bin/docs/master.txt
@@ -0,0 +1,47 @@
+Mailman queue runner control
+============================
+
+Mailman has a number of queue runners which process messages in its queue file
+directories. In normal operation, a command line script called 'mailmanctl'
+is used to start, stop and manage the queue runners. mailmanctl actually is
+just a wrapper around the real queue runner watcher script called master.py.
+
+ >>> from Mailman.tests.helpers import TestableMaster
+
+Start the master in a subthread.
+
+ >>> import threading
+ >>> event = threading.Event()
+ >>> event.clear()
+ >>> master = TestableMaster(event)
+ >>> master.start_qrunners()
+ >>> threading.Thread(target=master.loop).start()
+ >>> event.wait(5.0)
+
+Now verify that all the qrunners are running.
+
+ >>> import os
+
+ # This should produce no output.
+ >>> for pid in master.qrunner_pids:
+ ... os.kill(pid, 0)
+
+Send a SIGTERM to all the child processes, which should exit them. cleanup()
+waits until all the child processes have exited.
+
+ >>> import signal
+ >>> for pid in master.qrunner_pids:
+ ... os.kill(pid, signal.SIGTERM)
+ >>> master.cleanup()
+
+ >>> import errno
+ >>> for pid in master.qrunner_pids:
+ ... try:
+ ... os.kill(pid, 0)
+ ... print 'Process did not exit:', pid
+ ... except OSError, error:
+ ... if error.errno == errno.ESRCH:
+ ... # The child process exited.
+ ... pass
+ ... else:
+ ... raise
diff --git a/Mailman/bin/master.py b/Mailman/bin/master.py
index e4de11acc..83ec4508d 100644
--- a/Mailman/bin/master.py
+++ b/Mailman/bin/master.py
@@ -17,6 +17,13 @@
from __future__ import with_statement
+__metaclass__ = type
+__all__ = [
+ 'Loop',
+ 'get_lock_data',
+ ]
+
+
import os
import sys
import errno
@@ -40,7 +47,6 @@ from Mailman.initialize import initialize
DOT = '.'
LOCK_LIFETIME = Defaults.days(1) + Defaults.hours(6)
-log = None
parser = None
@@ -205,91 +211,100 @@ Exiting.""")
-def start_runner(qrname, slice, count):
- """Start a queue runner.
+class Loop:
+ """Main control loop class."""
- All arguments are passed to the qrunner process.
+ def __init__(self, lock=None, restartable=None, config_file=None):
+ self._lock = lock
+ self._restartable = restartable
+ self._config_file = config_file
+ self._kids = {}
- :param qrname: The name of the queue runner.
- :param slice: The slice number.
- :param count: The total number of slices.
- :return: The process id of the child queue runner.
- """
- pid = os.fork()
- if pid:
- # Parent.
- return pid
- # Child.
- #
- # Craft the command line arguments for the exec() call.
- rswitch = '--runner=%s:%d:%d' % (qrname, slice, count)
- # Wherever mailmanctl lives, so too must live the qrunner script.
- exe = os.path.join(config.BIN_DIR, 'qrunner')
- # config.PYTHON, which is the absolute path to the Python interpreter,
- # must be given as argv[0] due to Python's library search algorithm.
- args = [sys.executable, sys.executable, exe, rswitch, '-s']
- if parser.options.config:
- args.extend(['-C', parser.options.config])
- log.debug('starting: %s', args)
- os.execl(*args)
- # We should never get here.
- raise RuntimeError('os.execl() failed')
+ def install_signal_handlers(self):
+ """Install various signals handlers for control from mailmanctl."""
+ log = logging.getLogger('mailman.qrunner')
+ # Set up our signal handlers. Also set up a SIGALRM handler to
+ # refresh the lock once per day. The lock lifetime is 1 day + 6 hours
+ # so this should be plenty.
+ def sigalrm_handler(signum, frame):
+ self._lock.refresh()
+ signal.alarm(int(Defaults.days(1)))
+ signal.signal(signal.SIGALRM, sigalrm_handler)
+ signal.alarm(int(Defaults.days(1)))
+ # SIGHUP tells the qrunners to close and reopen their log files.
+ def sighup_handler(signum, frame):
+ loginit.reopen()
+ for pid in self._kids:
+ os.kill(pid, signal.SIGHUP)
+ log.info('Master watcher caught SIGHUP. Re-opening log files.')
+ signal.signal(signal.SIGHUP, sighup_handler)
+ # SIGUSR1 is used by 'mailman restart'.
+ def sigusr1_handler(signum, frame):
+ for pid in self._kids:
+ os.kill(pid, signal.SIGUSR1)
+ log.info('Master watcher caught SIGUSR1. Exiting.')
+ signal.signal(signal.SIGUSR1, sigusr1_handler)
+ # SIGTERM is what init will kill this process with when changing run
+ # levels. It's also the signal 'mailmanctl stop' uses.
+ def sigterm_handler(signum, frame):
+ for pid in self._kids:
+ os.kill(pid, signal.SIGTERM)
+ log.info('Master watcher caught SIGTERM. Exiting.')
+ signal.signal(signal.SIGTERM, sigterm_handler)
+ # SIGINT is what control-C gives.
+ def sigint_handler(signum, frame):
+ for pid in self._kids:
+ os.kill(pid, signal.SIGINT)
+ log.info('Master watcher caught SIGINT. Restarting.')
+ signal.signal(signal.SIGINT, sigint_handler)
+ def _start_runner(self, qrname, slice, count):
+ """Start a queue runner.
-
-def control_loop(lock):
- """The main control loop.
+ All arguments are passed to the qrunner process.
- This starts up the queue runners, watching for their exit and restarting
- them if need be.
- """
- restartable = parser.options.restartable
- # Start all the qrunners. Keep a dictionary mapping process ids to
- # information about the child processes.
- kids = {}
- # Set up our signal handlers. Also set up a SIGALRM handler to refresh
- # the lock once per day. The lock lifetime is 1 day + 6 hours so this
- # should be plenty.
- def sigalrm_handler(signum, frame):
- lock.refresh()
- signal.alarm(int(Defaults.days(1)))
- signal.signal(signal.SIGALRM, sigalrm_handler)
- signal.alarm(int(Defaults.days(1)))
- # SIGHUP tells the qrunners to close and reopen their log files.
- def sighup_handler(signum, frame):
- loginit.reopen()
- for pid in kids:
- os.kill(pid, signal.SIGHUP)
- log.info('Master watcher caught SIGHUP. Re-opening log files.')
- signal.signal(signal.SIGHUP, sighup_handler)
- # SIGUSR1 is used by 'mailman restart'.
- def sigusr1_handler(signum, frame):
- for pid in kids:
- os.kill(pid, signal.SIGUSR1)
- log.info('Master watcher caught SIGUSR1. Exiting.')
- signal.signal(signal.SIGUSR1, sigusr1_handler)
- # SIGTERM is what init will kill this process with when changing run
- # levels. It's also the signal 'mailmanctl stop' uses.
- def sigterm_handler(signum, frame):
- for pid in kids:
- os.kill(pid, signal.SIGTERM)
- log.info('Master watcher caught SIGTERM. Exiting.')
- signal.signal(signal.SIGTERM, sigterm_handler)
- # SIGINT is what control-C gives.
- def sigint_handler(signum, frame):
- for pid in kids:
- os.kill(pid, signal.SIGINT)
- log.info('Master watcher caught SIGINT. Restarting.')
- signal.signal(signal.SIGINT, sigint_handler)
- # Start all the child qrunners.
- for qrname, count in config.qrunners.items():
- for slice_number in range(count):
- # queue runner name, slice number, number of slices, restart count
- info = (qrname, slice_number, count, 0)
- pid = start_runner(qrname, slice_number, count)
- kids[pid] = info
- # Enter the main wait loop.
- try:
+ :param qrname: The name of the queue runner.
+ :param slice: The slice number.
+ :param count: The total number of slices.
+ :return: The process id of the child queue runner.
+ """
+ pid = os.fork()
+ if pid:
+ # Parent.
+ return pid
+ # Child.
+ #
+ # Craft the command line arguments for the exec() call.
+ rswitch = '--runner=%s:%d:%d' % (qrname, slice, count)
+ # Wherever mailmanctl lives, so too must live the qrunner script.
+ exe = os.path.join(config.BIN_DIR, 'qrunner')
+ # config.PYTHON, which is the absolute path to the Python interpreter,
+ # must be given as argv[0] due to Python's library search algorithm.
+ args = [sys.executable, sys.executable, exe, rswitch, '-s']
+ if self._config_file is not None:
+ args.extend(['-C', self._config_file])
+ log = logging.getLogger('mailman.qrunner')
+ log.debug('starting: %s', args)
+ os.execl(*args)
+ # We should never get here.
+ raise RuntimeError('os.execl() failed')
+
+ def start_qrunners(self):
+ """Start all the configured qrunners."""
+ for qrname, count in config.qrunners.items():
+ for slice_number in range(count):
+ # qrunner name, slice #, # of slices, restart count
+ info = (qrname, slice_number, count, 0)
+ pid = self._start_runner(qrname, slice_number, count)
+ self._kids[pid] = info
+
+ def loop(self):
+ """Main loop.
+
+ Wait until all the qrunners have exited, restarting them if necessary
+ and configured to do so.
+ """
+ log = logging.getLogger('mailman.qrunner')
while True:
try:
pid, status = os.wait()
@@ -314,9 +329,9 @@ def control_loop(lock):
# because of a failure (i.e. no exit signal), and the no-restart
# command line switch was not given. This lets us better handle
# runaway restarts (e.g. if the subprocess had a syntax error!)
- qrname, slice, count, restarts = kids.pop(pid)
+ qrname, slice, count, restarts = self._kids.pop(pid)
restart = False
- if why == signal.SIGUSR1 and restartable:
+ if why == signal.SIGUSR1 and self._restartable:
restart = True
# Have we hit the maximum number of restarts?
restarts += 1
@@ -337,12 +352,14 @@ qrunner %s reached maximum restart limit of %d, not restarting.""",
# SIGTERM or we aren't restarting.
if restart:
newpid = start_runner(qrname, slice, count)
- kids[newpid] = (qrname, slice, count, restarts)
- finally:
- # Should we leave the main loop for any reason, we want to be sure
- # all of our children are exited cleanly. Send SIGTERMs to all
- # the child processes and wait for them all to exit.
- for pid in kids:
+ self._kids[newpid] = (qrname, slice, count, restarts)
+
+ def cleanup(self):
+ """Ensure that all children have exited."""
+ log = logging.getLogger('mailman.qrunner')
+ # Send SIGTERMs to all the child processes and wait for them all to
+ # exit.
+ for pid in self._kids:
try:
os.kill(pid, signal.SIGTERM)
except OSError, error:
@@ -350,10 +367,10 @@ qrunner %s reached maximum restart limit of %d, not restarting.""",
# The child has already exited.
log.info('ESRCH on pid: %d', pid)
# Wait for all the children to go away.
- while kids:
+ while self._kids:
try:
pid, status = os.wait()
- del kids[pid]
+ del self._kids[pid]
except OSError, e:
if e.errno == errno.ECHILD:
break
@@ -370,9 +387,6 @@ def main():
parser = parseargs()
initialize(parser.options.config)
- # We can't grab the logger until after everything's been initialized.
- log = logging.getLogger('mailman.qrunner')
-
# Acquire the master lock, exiting if we can't acquire it. We'll let the
# caller handle any clean up or lock breaking. No with statement here
# because Lock's constructor doesn't support a timeout.
@@ -380,9 +394,13 @@ def main():
try:
with open(config.PIDFILE, 'w') as fp:
print >> fp, os.getpid()
+ loop = Loop(lock, parser.options.restartable, parser.options.config)
+ loop.install_signal_handlers()
try:
- control_loop(lock)
+ loop.start_qrunners()
+ loop.loop()
finally:
+ loop.cleanup()
os.remove(config.PIDFILE)
finally:
lock.unlock()
@@ -390,7 +408,4 @@ def main():
if __name__ == '__main__':
- try:
- main()
- except KeyboardInterrupt:
- pass
+ main()
diff --git a/Mailman/tests/helpers.py b/Mailman/tests/helpers.py
index 564c133d9..cb97fc5e6 100644
--- a/Mailman/tests/helpers.py
+++ b/Mailman/tests/helpers.py
@@ -21,7 +21,7 @@ from __future__ import with_statement
__metaclass__ = type
__all__ = [
- 'Watcher',
+ 'TestableMaster',
'digest_mbox',
'get_queue_messages',
'make_testable_runner',
@@ -36,6 +36,7 @@ import subprocess
from datetime import datetime, timedelta
+from Mailman.bin.master import Loop as Master
from Mailman.configuration import config
from Mailman.queue import Switchboard
@@ -97,72 +98,37 @@ def digest_mbox(mlist):
-class Watcher:
- """A doctest stand-in for the queue file watcher."""
+class TestableMaster(Master):
+ """A testable master loop watcher."""
- def __init__(self):
- self.exe = os.path.join(config.BIN_DIR, 'mailmanctl')
- self.returncode = None
- self.stdout = None
- self.stderr = None
- self.pid = None
+ def __init__(self, event):
+ super(TestableMaster, self).__init__(
+ restartable=False, config_file=config.filename)
+ self._event = event
+ self._started_kids = None
- def start(self):
- """Start the watcher and wait until it actually starts."""
- process = subprocess.Popen(
- (self.exe, '-C', config.filename, '-q', 'start'))
- stdout, stderr = process.communicate()
- # Wait until the pid file exists.
- until = datetime.now() + WAIT_INTERVAL
- while datetime.now() < until:
- try:
- with open(config.PIDFILE) as f:
- pid = int(f.read().strip())
- break
- except IOError, error:
- if error.errno == errno.ENOENT:
- time.sleep(0.1)
- else:
+ def loop(self):
+ """Wait until all the qrunners are actually running before looping."""
+ starting_kids = set(self._kids)
+ while starting_kids:
+ for pid in self._kids:
+ try:
+ os.kill(pid, 0)
+ starting_kids.remove(pid)
+ except OSError, error:
+ if error.errno == errno.ESRCH:
+ # The child has not yet started.
+ pass
raise
- else:
- # This will usually cause the doctest to fail.
- return 'Time out'
- # Now wait until the process actually exists.
- until = datetime.now() + WAIT_INTERVAL
- while datetime.now() < until:
- try:
- os.kill(pid, 0)
- break
- except OSError, error:
- if error.errno == errno.ESRCH:
- time.sleep(0.1)
- else:
- raise
- else:
- return 'Time out'
- self.returncode = process.returncode
- self.stdout = stdout
- self.stderr = stderr
- self.pid = pid
+ # Keeping a copy of all the started child processes for use by the
+ # testing environment, even after all have exited.
+ self._started_kids = set(self._kids)
+ # Let the blocking thread know everything's running.
+ self._event.set()
+ super(TestableMaster, self).loop()
- def stop(self):
- """Stop the watcher and wait until it actually stops."""
- process = subprocess.Popen(
- (self.exe, '-C', config.filename, '-q', 'stop'))
- stdout, stderr = process.communicate()
- # Now wait until the process stops.
- until = datetime.now() + WAIT_INTERVAL
- while datetime.now() < until:
- try:
- os.kill(self.pid, 0)
- time.sleep(0.1)
- except OSError, error:
- if error.errno == errno.ESRCH:
- break
- else:
- raise
- else:
- return 'Time out'
- self.returncode = process.returncode
- self.stdout = stdout
- self.stderr = stderr
+ @property
+ def qrunner_pids(self):
+ """The pids of all the child qrunner processes."""
+ for pid in self._started_kids:
+ yield pid