diff options
| author | Barry Warsaw | 2008-02-27 22:22:09 -0500 |
|---|---|---|
| committer | Barry Warsaw | 2008-02-27 22:22:09 -0500 |
| commit | 3f31f8cce369529d177cfb5a7c66346ec1e12130 (patch) | |
| tree | 15f9c0a2cde40ea4aa03e18e1cfd1852b0c72916 | |
| parent | f0c044111dfdf6ffe3531df18ccf268a4056874b (diff) | |
| download | mailman-3f31f8cce369529d177cfb5a7c66346ec1e12130.tar.gz mailman-3f31f8cce369529d177cfb5a7c66346ec1e12130.tar.zst mailman-3f31f8cce369529d177cfb5a7c66346ec1e12130.zip | |
| -rw-r--r-- | Mailman/bin/docs/mailmanctl.txt | 27 | ||||
| -rw-r--r-- | Mailman/bin/docs/master.txt | 47 | ||||
| -rw-r--r-- | Mailman/bin/master.py | 213 | ||||
| -rw-r--r-- | Mailman/tests/helpers.py | 98 |
4 files changed, 193 insertions, 192 deletions
diff --git a/Mailman/bin/docs/mailmanctl.txt b/Mailman/bin/docs/mailmanctl.txt deleted file mode 100644 index bd9023371..000000000 --- a/Mailman/bin/docs/mailmanctl.txt +++ /dev/null @@ -1,27 +0,0 @@ -Mailman queue runner control -============================ - -Mailman has a number of queue runners which process messages in its queue file -directories. In normal operation, a command line script called 'mailmanctl' -is used to start, stop and manage the queue runners. mailmanctl actually is -just a wrapper around the real queue runner watcher script called master.py. - -Because master.py runs in the foreground, we can't start it directly, so we'll -start it via mailmanctl. - - >>> from Mailman.tests.helpers import Watcher - >>> watcher = Watcher() - >>> watcher.start() - - >>> import os - - # This will raise an exception if the process doesn't exist. - >>> os.kill(watcher.pid, 0) - -It's also easy to stop the queue runners via the mailmanctl program. - - >>> watcher.stop() - >>> os.kill(watcher.pid, 0) - Traceback (most recent call last): - ... - OSError: [Errno ...] No such process diff --git a/Mailman/bin/docs/master.txt b/Mailman/bin/docs/master.txt new file mode 100644 index 000000000..d1df43d86 --- /dev/null +++ b/Mailman/bin/docs/master.txt @@ -0,0 +1,47 @@ +Mailman queue runner control +============================ + +Mailman has a number of queue runners which process messages in its queue file +directories. In normal operation, a command line script called 'mailmanctl' +is used to start, stop and manage the queue runners. mailmanctl actually is +just a wrapper around the real queue runner watcher script called master.py. + + >>> from Mailman.tests.helpers import TestableMaster + +Start the master in a subthread. + + >>> import threading + >>> event = threading.Event() + >>> event.clear() + >>> master = TestableMaster(event) + >>> master.start_qrunners() + >>> threading.Thread(target=master.loop).start() + >>> event.wait(5.0) + +Now verify that all the qrunners are running. + + >>> import os + + # This should produce no output. + >>> for pid in master.qrunner_pids: + ... os.kill(pid, 0) + +Send a SIGTERM to all the child processes, which should exit them. cleanup() +waits until all the child processes have exited. + + >>> import signal + >>> for pid in master.qrunner_pids: + ... os.kill(pid, signal.SIGTERM) + >>> master.cleanup() + + >>> import errno + >>> for pid in master.qrunner_pids: + ... try: + ... os.kill(pid, 0) + ... print 'Process did not exit:', pid + ... except OSError, error: + ... if error.errno == errno.ESRCH: + ... # The child process exited. + ... pass + ... else: + ... raise diff --git a/Mailman/bin/master.py b/Mailman/bin/master.py index e4de11acc..83ec4508d 100644 --- a/Mailman/bin/master.py +++ b/Mailman/bin/master.py @@ -17,6 +17,13 @@ from __future__ import with_statement +__metaclass__ = type +__all__ = [ + 'Loop', + 'get_lock_data', + ] + + import os import sys import errno @@ -40,7 +47,6 @@ from Mailman.initialize import initialize DOT = '.' LOCK_LIFETIME = Defaults.days(1) + Defaults.hours(6) -log = None parser = None @@ -205,91 +211,100 @@ Exiting.""") -def start_runner(qrname, slice, count): - """Start a queue runner. +class Loop: + """Main control loop class.""" - All arguments are passed to the qrunner process. + def __init__(self, lock=None, restartable=None, config_file=None): + self._lock = lock + self._restartable = restartable + self._config_file = config_file + self._kids = {} - :param qrname: The name of the queue runner. - :param slice: The slice number. - :param count: The total number of slices. - :return: The process id of the child queue runner. - """ - pid = os.fork() - if pid: - # Parent. - return pid - # Child. - # - # Craft the command line arguments for the exec() call. - rswitch = '--runner=%s:%d:%d' % (qrname, slice, count) - # Wherever mailmanctl lives, so too must live the qrunner script. - exe = os.path.join(config.BIN_DIR, 'qrunner') - # config.PYTHON, which is the absolute path to the Python interpreter, - # must be given as argv[0] due to Python's library search algorithm. - args = [sys.executable, sys.executable, exe, rswitch, '-s'] - if parser.options.config: - args.extend(['-C', parser.options.config]) - log.debug('starting: %s', args) - os.execl(*args) - # We should never get here. - raise RuntimeError('os.execl() failed') + def install_signal_handlers(self): + """Install various signals handlers for control from mailmanctl.""" + log = logging.getLogger('mailman.qrunner') + # Set up our signal handlers. Also set up a SIGALRM handler to + # refresh the lock once per day. The lock lifetime is 1 day + 6 hours + # so this should be plenty. + def sigalrm_handler(signum, frame): + self._lock.refresh() + signal.alarm(int(Defaults.days(1))) + signal.signal(signal.SIGALRM, sigalrm_handler) + signal.alarm(int(Defaults.days(1))) + # SIGHUP tells the qrunners to close and reopen their log files. + def sighup_handler(signum, frame): + loginit.reopen() + for pid in self._kids: + os.kill(pid, signal.SIGHUP) + log.info('Master watcher caught SIGHUP. Re-opening log files.') + signal.signal(signal.SIGHUP, sighup_handler) + # SIGUSR1 is used by 'mailman restart'. + def sigusr1_handler(signum, frame): + for pid in self._kids: + os.kill(pid, signal.SIGUSR1) + log.info('Master watcher caught SIGUSR1. Exiting.') + signal.signal(signal.SIGUSR1, sigusr1_handler) + # SIGTERM is what init will kill this process with when changing run + # levels. It's also the signal 'mailmanctl stop' uses. + def sigterm_handler(signum, frame): + for pid in self._kids: + os.kill(pid, signal.SIGTERM) + log.info('Master watcher caught SIGTERM. Exiting.') + signal.signal(signal.SIGTERM, sigterm_handler) + # SIGINT is what control-C gives. + def sigint_handler(signum, frame): + for pid in self._kids: + os.kill(pid, signal.SIGINT) + log.info('Master watcher caught SIGINT. Restarting.') + signal.signal(signal.SIGINT, sigint_handler) + def _start_runner(self, qrname, slice, count): + """Start a queue runner. - -def control_loop(lock): - """The main control loop. + All arguments are passed to the qrunner process. - This starts up the queue runners, watching for their exit and restarting - them if need be. - """ - restartable = parser.options.restartable - # Start all the qrunners. Keep a dictionary mapping process ids to - # information about the child processes. - kids = {} - # Set up our signal handlers. Also set up a SIGALRM handler to refresh - # the lock once per day. The lock lifetime is 1 day + 6 hours so this - # should be plenty. - def sigalrm_handler(signum, frame): - lock.refresh() - signal.alarm(int(Defaults.days(1))) - signal.signal(signal.SIGALRM, sigalrm_handler) - signal.alarm(int(Defaults.days(1))) - # SIGHUP tells the qrunners to close and reopen their log files. - def sighup_handler(signum, frame): - loginit.reopen() - for pid in kids: - os.kill(pid, signal.SIGHUP) - log.info('Master watcher caught SIGHUP. Re-opening log files.') - signal.signal(signal.SIGHUP, sighup_handler) - # SIGUSR1 is used by 'mailman restart'. - def sigusr1_handler(signum, frame): - for pid in kids: - os.kill(pid, signal.SIGUSR1) - log.info('Master watcher caught SIGUSR1. Exiting.') - signal.signal(signal.SIGUSR1, sigusr1_handler) - # SIGTERM is what init will kill this process with when changing run - # levels. It's also the signal 'mailmanctl stop' uses. - def sigterm_handler(signum, frame): - for pid in kids: - os.kill(pid, signal.SIGTERM) - log.info('Master watcher caught SIGTERM. Exiting.') - signal.signal(signal.SIGTERM, sigterm_handler) - # SIGINT is what control-C gives. - def sigint_handler(signum, frame): - for pid in kids: - os.kill(pid, signal.SIGINT) - log.info('Master watcher caught SIGINT. Restarting.') - signal.signal(signal.SIGINT, sigint_handler) - # Start all the child qrunners. - for qrname, count in config.qrunners.items(): - for slice_number in range(count): - # queue runner name, slice number, number of slices, restart count - info = (qrname, slice_number, count, 0) - pid = start_runner(qrname, slice_number, count) - kids[pid] = info - # Enter the main wait loop. - try: + :param qrname: The name of the queue runner. + :param slice: The slice number. + :param count: The total number of slices. + :return: The process id of the child queue runner. + """ + pid = os.fork() + if pid: + # Parent. + return pid + # Child. + # + # Craft the command line arguments for the exec() call. + rswitch = '--runner=%s:%d:%d' % (qrname, slice, count) + # Wherever mailmanctl lives, so too must live the qrunner script. + exe = os.path.join(config.BIN_DIR, 'qrunner') + # config.PYTHON, which is the absolute path to the Python interpreter, + # must be given as argv[0] due to Python's library search algorithm. + args = [sys.executable, sys.executable, exe, rswitch, '-s'] + if self._config_file is not None: + args.extend(['-C', self._config_file]) + log = logging.getLogger('mailman.qrunner') + log.debug('starting: %s', args) + os.execl(*args) + # We should never get here. + raise RuntimeError('os.execl() failed') + + def start_qrunners(self): + """Start all the configured qrunners.""" + for qrname, count in config.qrunners.items(): + for slice_number in range(count): + # qrunner name, slice #, # of slices, restart count + info = (qrname, slice_number, count, 0) + pid = self._start_runner(qrname, slice_number, count) + self._kids[pid] = info + + def loop(self): + """Main loop. + + Wait until all the qrunners have exited, restarting them if necessary + and configured to do so. + """ + log = logging.getLogger('mailman.qrunner') while True: try: pid, status = os.wait() @@ -314,9 +329,9 @@ def control_loop(lock): # because of a failure (i.e. no exit signal), and the no-restart # command line switch was not given. This lets us better handle # runaway restarts (e.g. if the subprocess had a syntax error!) - qrname, slice, count, restarts = kids.pop(pid) + qrname, slice, count, restarts = self._kids.pop(pid) restart = False - if why == signal.SIGUSR1 and restartable: + if why == signal.SIGUSR1 and self._restartable: restart = True # Have we hit the maximum number of restarts? restarts += 1 @@ -337,12 +352,14 @@ qrunner %s reached maximum restart limit of %d, not restarting.""", # SIGTERM or we aren't restarting. if restart: newpid = start_runner(qrname, slice, count) - kids[newpid] = (qrname, slice, count, restarts) - finally: - # Should we leave the main loop for any reason, we want to be sure - # all of our children are exited cleanly. Send SIGTERMs to all - # the child processes and wait for them all to exit. - for pid in kids: + self._kids[newpid] = (qrname, slice, count, restarts) + + def cleanup(self): + """Ensure that all children have exited.""" + log = logging.getLogger('mailman.qrunner') + # Send SIGTERMs to all the child processes and wait for them all to + # exit. + for pid in self._kids: try: os.kill(pid, signal.SIGTERM) except OSError, error: @@ -350,10 +367,10 @@ qrunner %s reached maximum restart limit of %d, not restarting.""", # The child has already exited. log.info('ESRCH on pid: %d', pid) # Wait for all the children to go away. - while kids: + while self._kids: try: pid, status = os.wait() - del kids[pid] + del self._kids[pid] except OSError, e: if e.errno == errno.ECHILD: break @@ -370,9 +387,6 @@ def main(): parser = parseargs() initialize(parser.options.config) - # We can't grab the logger until after everything's been initialized. - log = logging.getLogger('mailman.qrunner') - # Acquire the master lock, exiting if we can't acquire it. We'll let the # caller handle any clean up or lock breaking. No with statement here # because Lock's constructor doesn't support a timeout. @@ -380,9 +394,13 @@ def main(): try: with open(config.PIDFILE, 'w') as fp: print >> fp, os.getpid() + loop = Loop(lock, parser.options.restartable, parser.options.config) + loop.install_signal_handlers() try: - control_loop(lock) + loop.start_qrunners() + loop.loop() finally: + loop.cleanup() os.remove(config.PIDFILE) finally: lock.unlock() @@ -390,7 +408,4 @@ def main(): if __name__ == '__main__': - try: - main() - except KeyboardInterrupt: - pass + main() diff --git a/Mailman/tests/helpers.py b/Mailman/tests/helpers.py index 564c133d9..cb97fc5e6 100644 --- a/Mailman/tests/helpers.py +++ b/Mailman/tests/helpers.py @@ -21,7 +21,7 @@ from __future__ import with_statement __metaclass__ = type __all__ = [ - 'Watcher', + 'TestableMaster', 'digest_mbox', 'get_queue_messages', 'make_testable_runner', @@ -36,6 +36,7 @@ import subprocess from datetime import datetime, timedelta +from Mailman.bin.master import Loop as Master from Mailman.configuration import config from Mailman.queue import Switchboard @@ -97,72 +98,37 @@ def digest_mbox(mlist): -class Watcher: - """A doctest stand-in for the queue file watcher.""" +class TestableMaster(Master): + """A testable master loop watcher.""" - def __init__(self): - self.exe = os.path.join(config.BIN_DIR, 'mailmanctl') - self.returncode = None - self.stdout = None - self.stderr = None - self.pid = None + def __init__(self, event): + super(TestableMaster, self).__init__( + restartable=False, config_file=config.filename) + self._event = event + self._started_kids = None - def start(self): - """Start the watcher and wait until it actually starts.""" - process = subprocess.Popen( - (self.exe, '-C', config.filename, '-q', 'start')) - stdout, stderr = process.communicate() - # Wait until the pid file exists. - until = datetime.now() + WAIT_INTERVAL - while datetime.now() < until: - try: - with open(config.PIDFILE) as f: - pid = int(f.read().strip()) - break - except IOError, error: - if error.errno == errno.ENOENT: - time.sleep(0.1) - else: + def loop(self): + """Wait until all the qrunners are actually running before looping.""" + starting_kids = set(self._kids) + while starting_kids: + for pid in self._kids: + try: + os.kill(pid, 0) + starting_kids.remove(pid) + except OSError, error: + if error.errno == errno.ESRCH: + # The child has not yet started. + pass raise - else: - # This will usually cause the doctest to fail. - return 'Time out' - # Now wait until the process actually exists. - until = datetime.now() + WAIT_INTERVAL - while datetime.now() < until: - try: - os.kill(pid, 0) - break - except OSError, error: - if error.errno == errno.ESRCH: - time.sleep(0.1) - else: - raise - else: - return 'Time out' - self.returncode = process.returncode - self.stdout = stdout - self.stderr = stderr - self.pid = pid + # Keeping a copy of all the started child processes for use by the + # testing environment, even after all have exited. + self._started_kids = set(self._kids) + # Let the blocking thread know everything's running. + self._event.set() + super(TestableMaster, self).loop() - def stop(self): - """Stop the watcher and wait until it actually stops.""" - process = subprocess.Popen( - (self.exe, '-C', config.filename, '-q', 'stop')) - stdout, stderr = process.communicate() - # Now wait until the process stops. - until = datetime.now() + WAIT_INTERVAL - while datetime.now() < until: - try: - os.kill(self.pid, 0) - time.sleep(0.1) - except OSError, error: - if error.errno == errno.ESRCH: - break - else: - raise - else: - return 'Time out' - self.returncode = process.returncode - self.stdout = stdout - self.stderr = stderr + @property + def qrunner_pids(self): + """The pids of all the child qrunner processes.""" + for pid in self._started_kids: + yield pid |
