diff options
| author | Barry Warsaw | 2011-10-16 15:31:28 -0400 |
|---|---|---|
| committer | Barry Warsaw | 2011-10-16 15:31:28 -0400 |
| commit | a8b8be8ad7510c095c3232c25f2c2e3e54d44352 (patch) | |
| tree | a44d5202941b6bc59177bfe86a33f3a216870bd6 /src/mailman/commands/tests | |
| parent | 24c3bafeb6551aa52a2df3c1b151b4bde07c3de0 (diff) | |
| download | mailman-a8b8be8ad7510c095c3232c25f2c2e3e54d44352.tar.gz mailman-a8b8be8ad7510c095c3232c25f2c2e3e54d44352.tar.zst mailman-a8b8be8ad7510c095c3232c25f2c2e3e54d44352.zip | |
Diffstat (limited to 'src/mailman/commands/tests')
| -rw-r--r-- | src/mailman/commands/tests/test_control.py | 171 | ||||
| -rw-r--r-- | src/mailman/commands/tests/test_create.py | 17 |
2 files changed, 185 insertions, 3 deletions
diff --git a/src/mailman/commands/tests/test_control.py b/src/mailman/commands/tests/test_control.py new file mode 100644 index 000000000..105ba95d5 --- /dev/null +++ b/src/mailman/commands/tests/test_control.py @@ -0,0 +1,171 @@ +# Copyright (C) 2011 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. + +"""Test some additional corner cases for starting/stopping.""" + +from __future__ import absolute_import, unicode_literals + +__metaclass__ = type +__all__ = [ + 'find_master', + 'make_config', + 'test_suite', + ] + + +import os +import sys +import time +import errno +import signal +import shutil +import socket +import unittest + +from datetime import timedelta, datetime + +from mailman.commands.cli_control import Start, kill_watcher +from mailman.config import config +from mailman.testing.layers import ConfigLayer + +SEP = '|' + + + +def make_config(): + # All we care about is the master process; normally it starts a bunch of + # runners, but we don't care about any of them, so write a test + # configuration file for the master that disables all the runners. + new_config = 'no-runners.cfg' + config_file = os.path.join(os.path.dirname(config.filename), new_config) + shutil.copyfile(config.filename, config_file) + with open(config_file, 'a') as fp: + for runner_config in config.runner_configs: + print >> fp, '[{0}]\nstart:no\n'.format(runner_config.name) + return config_file + + +def find_master(): + # See if the master process is still running. + until = timedelta(seconds=10) + datetime.now() + while datetime.now() < until: + time.sleep(0.1) + try: + with open(config.PID_FILE) as fp: + pid = int(fp.read().strip()) + os.kill(pid, 0) + except IOError as error: + if error.errno != errno.ENOENT: + raise + except ValueError: + pass + except OSError as error: + if error.errno != errno.ESRCH: + raise + else: + return pid + else: + return None + + + +class FakeArgs: + force = None + run_as_user = None + quiet = True + config = None + + +class FakeParser: + def __init__(self): + self.message = None + + def error(self, message): + self.message = message + sys.exit(1) + + + +class TestStart(unittest.TestCase): + """Test various starting scenarios.""" + + layer = ConfigLayer + + def setUp(self): + self.command = Start() + self.command.parser = FakeParser() + self.args = FakeArgs() + self.args.config = make_config() + + def tearDown(self): + try: + with open(config.PID_FILE) as fp: + master_pid = int(fp.read()) + except OSError as error: + if error.errno != errno.ENOENT: + raise + # There is no master, so just ignore this. + return + kill_watcher(signal.SIGTERM) + os.waitpid(master_pid, 0) + + def test_force_stale_lock(self): + # Fake an acquisition of the master lock by another process, which + # subsequently goes stale. Start by finding a free process id. Yes, + # this could race, but given that we're starting with our own PID and + # searching downward, it's less likely. + fake_pid = os.getpid() - 1 + while fake_pid > 1: + try: + os.kill(fake_pid, 0) + except OSError as error: + if error.errno == errno.ESRCH: + break + fake_pid -= 1 + else: + raise RuntimeError('Cannot find free PID') + # Lock acquisition logic taken from flufl.lock. + claim_file = SEP.join(( + config.LOCK_FILE, + socket.getfqdn(), + str(fake_pid), + '0')) + with open(config.LOCK_FILE, 'w') as fp: + fp.write(claim_file) + os.link(config.LOCK_FILE, claim_file) + expiration_date = datetime.now() - timedelta(minutes=2) + t = time.mktime(expiration_date.timetuple()) + os.utime(claim_file, (t, t)) + # Start without --force; no master will be running. + try: + self.command.process(self.args) + except SystemExit: + pass + self.assertEqual(find_master(), None) + self.assertTrue('--force' in self.command.parser.message) + # Start again, this time with --force. + self.args.force = True + self.command.process(self.args) + pid = find_master() + self.assertNotEqual(pid, None) + + + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(TestStart)) + return suite diff --git a/src/mailman/commands/tests/test_create.py b/src/mailman/commands/tests/test_create.py index c2176a106..76a8e0e00 100644 --- a/src/mailman/commands/tests/test_create.py +++ b/src/mailman/commands/tests/test_create.py @@ -25,6 +25,7 @@ __all__ = [ ] +import sys import unittest from mailman.app.lifecycle import create_list @@ -48,6 +49,7 @@ class FakeParser: def error(self, message): self.message = message + sys.exit(1) @@ -65,14 +67,20 @@ class TestCreate(unittest.TestCase): # Cannot create a mailing list if it already exists. create_list('test@example.com') self.args.listname = ['test@example.com'] - self.command.process(self.args) + try: + self.command.process(self.args) + except SystemExit: + pass self.assertEqual(self.command.parser.message, 'List already exists: test@example.com') def test_invalid_posting_address(self): # Cannot create a mailing list with an invalid posting address. self.args.listname = ['foo'] - self.command.process(self.args) + try: + self.command.process(self.args) + except SystemExit: + pass self.assertEqual(self.command.parser.message, 'Illegal list name: foo') @@ -81,7 +89,10 @@ class TestCreate(unittest.TestCase): self.args.listname = ['test@example.com'] self.args.domain = True self.args.owners = ['main=True'] - self.command.process(self.args) + try: + self.command.process(self.args) + except SystemExit: + pass self.assertEqual(self.command.parser.message, 'Illegal owner addresses: main=True') |
