diff options
Diffstat (limited to 'src/mailman/commands/tests')
21 files changed, 1056 insertions, 732 deletions
diff --git a/src/mailman/commands/tests/data/__init__.py b/src/mailman/commands/tests/data/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/mailman/commands/tests/data/__init__.py diff --git a/src/mailman/commands/tests/data/no-runners.cfg b/src/mailman/commands/tests/data/no-runners.cfg new file mode 100644 index 000000000..8f423460c --- /dev/null +++ b/src/mailman/commands/tests/data/no-runners.cfg @@ -0,0 +1,51 @@ +# Copyright (C) 2008-2017 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. + +# Disable all runners. + +[runner.archive] +start: no + +[runner.bounces] +start: no + +[runner.command] +start: no + +[runner.in] +start: no + +[runner.lmtp] +start: no + +[runner.nntp] +start: no + +[runner.out] +start: no + +[runner.pipeline] +start: no + +[runner.retry] +start: no + +[runner.shunt] +start: no + +[runner.virgin] +start: no diff --git a/src/mailman/commands/tests/test_cli_conf.py b/src/mailman/commands/tests/test_cli_conf.py new file mode 100644 index 000000000..27fca27ac --- /dev/null +++ b/src/mailman/commands/tests/test_cli_conf.py @@ -0,0 +1,74 @@ +# Copyright (C) 2013-2017 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. + +"""Test the conf subcommand.""" + +import unittest + +from click.testing import CliRunner +from mailman.commands.cli_conf import conf +from mailman.testing.layers import ConfigLayer +from tempfile import NamedTemporaryFile + + +class TestConf(unittest.TestCase): + """Test the conf subcommand.""" + + layer = ConfigLayer + + def setUp(self): + self._command = CliRunner() + + def test_cannot_access_nonexistent_section(self): + result = self._command.invoke(conf, ('-s', 'thissectiondoesnotexist')) + self.assertEqual(result.exit_code, 2) + self.assertEqual( + result.output, + 'Usage: conf [OPTIONS]\n\n' + 'Error: No such section: thissectiondoesnotexist\n') + + def test_cannot_access_nonexistent_section_and_key(self): + result = self._command.invoke( + conf, ('-s', 'thissectiondoesnotexist', '-k', 'nosuchkey')) + self.assertEqual(result.exit_code, 2) + self.assertEqual( + result.output, + 'Usage: conf [OPTIONS]\n\n' + 'Error: No such section: thissectiondoesnotexist\n') + + def test_cannot_access_nonexistent_key(self): + result = self._command.invoke( + conf, ('-s', 'mailman', '-k', 'thiskeydoesnotexist')) + self.assertEqual(result.exit_code, 2) + self.assertEqual( + result.output, + 'Usage: conf [OPTIONS]\n\n' + 'Error: Section mailman: No such key: thiskeydoesnotexist\n') + + def test_output_to_explicit_stdout(self): + result = self._command.invoke( + conf, ('-o', '-', '-s', 'shell', '-k', 'use_ipython')) + self.assertEqual(result.exit_code, 0) + self.assertEqual(result.output, 'no\n') + + def test_output_to_file(self): + with NamedTemporaryFile() as outfp: + result = self._command.invoke( + conf, ('-o', outfp.name, '-s', 'shell', '-k', 'use_ipython')) + self.assertEqual(result.exit_code, 0) + with open(outfp.name, 'r', encoding='utf-8') as infp: + self.assertEqual(infp.read(), 'no\n') diff --git a/src/mailman/commands/tests/test_cli_control.py b/src/mailman/commands/tests/test_cli_control.py new file mode 100644 index 000000000..642450167 --- /dev/null +++ b/src/mailman/commands/tests/test_cli_control.py @@ -0,0 +1,295 @@ +# Copyright (C) 2011-2017 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. + +"""Test some additional corner cases for starting/stopping.""" + +import os +import sys +import time +import shutil +import signal +import socket +import unittest + +from click.testing import CliRunner +from contextlib import ExitStack, suppress +from datetime import datetime, timedelta +from flufl.lock import SEP +from mailman.bin.master import WatcherState +from mailman.commands.cli_control import reopen, restart, start +from mailman.config import config +from mailman.testing.helpers import configuration +from mailman.testing.layers import ConfigLayer +from pkg_resources import resource_filename +from public import public +from tempfile import TemporaryDirectory +from unittest.mock import patch + + +# For ../docs/control.rst +@public +def make_config(resources): + cfg_path = resource_filename( + 'mailman.commands.tests.data', 'no-runners.cfg') + # We have to patch the global config's filename attribute. The problem + # here is that click does not support setting the -C option on the + # parent command (i.e. `master`). + # https://github.com/pallets/click/issues/831 + resources.enter_context(patch.object(config, 'filename', cfg_path)) + + +# For ../docs/control.rst +@public +def find_master(): + # See if the master process is still running. + until = timedelta(seconds=10) + datetime.now() + while datetime.now() < until: + time.sleep(0.1) + with suppress(FileNotFoundError, ValueError, ProcessLookupError): + with open(config.PID_FILE) as fp: + pid = int(fp.read().strip()) + os.kill(pid, 0) + return pid + return None + + +@public +def claim_lock(): + # Fake an acquisition of the master lock by another process, which + # subsequently goes stale. Start by finding a free process id. Yes, + # this could race, but given that we're starting with our own PID and + # searching downward, it's less likely. + fake_pid = os.getpid() - 1 + while fake_pid > 1: + try: + os.kill(fake_pid, 0) + except ProcessLookupError: + break + fake_pid -= 1 + else: + raise RuntimeError('Cannot find free PID') + # Lock acquisition logic taken from flufl.lock. + claim_file = SEP.join(( + config.LOCK_FILE, + socket.getfqdn(), + str(fake_pid), + '0')) + with open(config.LOCK_FILE, 'w') as fp: + fp.write(claim_file) + os.link(config.LOCK_FILE, claim_file) + expiration_date = datetime.now() - timedelta(minutes=5) + t = time.mktime(expiration_date.timetuple()) + os.utime(claim_file, (t, t)) + return claim_file + + +@public +def kill_with_extreme_prejudice(pid_or_pidfile=None): + # 2016-12-03 barry: We have intermittent hangs during both local and CI + # test suite runs where killing a runner or master process doesn't + # terminate the process. In those cases, wait()ing on the child can + # suspend the test process indefinitely. Locally, you have to C-c the + # test process, but that still doesn't kill it; the process continues to + # run in the background. If you then search for the process's pid and + # SIGTERM it, it will usually exit, which is why I don't understand why + # the above SIGTERM doesn't kill it sometimes. However, when run under + # CI, the test suite will just hang until the CI runner times it out. It + # would be better to figure out the underlying cause, because we have + # definitely seen other situations where a runner process won't exit, but + # for testing purposes we're just trying to clean up some resources so + # after a brief attempt at SIGTERMing it, let's SIGKILL it and warn. + if isinstance(pid_or_pidfile, str): + try: + with open(pid_or_pidfile, 'r') as fp: + pid = int(fp.read()) + except FileNotFoundError: + # There's nothing to kill. + return + else: + pid = pid_or_pidfile + if pid is not None: + os.kill(pid, signal.SIGTERM) + until = timedelta(seconds=10) + datetime.now() + while datetime.now() < until: + try: + if pid is None: + os.wait3(os.WNOHANG) + else: + os.waitpid(pid, os.WNOHANG) + except ChildProcessError: + # This basically means we went one too many times around the + # loop. The previous iteration successfully reaped the child. + # Because the return status of wait3() and waitpid() are different + # in those cases, it's easier just to catch the exception for + # either call and exit. + return + time.sleep(0.1) + else: + if pid is None: + # There's really not much more we can do because we have no pid to + # SIGKILL. Just report the problem and continue. + print('WARNING: NO CHANGE IN CHILD PROCESS STATES', + file=sys.stderr) + return + print('WARNING: SIGTERM DID NOT EXIT PROCESS; SIGKILLing', + file=sys.stderr) + if pid is not None: + os.kill(pid, signal.SIGKILL) + until = timedelta(seconds=10) + datetime.now() + while datetime.now() < until: + status = os.waitpid(pid, os.WNOHANG) + if status == (0, 0): + # The child was reaped. + return + time.sleep(0.1) + else: + print('WARNING: SIGKILL DID NOT EXIT PROCESS!', file=sys.stderr) + + +class TestControl(unittest.TestCase): + layer = ConfigLayer + maxDiff = None + + def setUp(self): + self._command = CliRunner() + self._tmpdir = TemporaryDirectory() + self.addCleanup(self._tmpdir.cleanup) + # Specify where to put the pid file; and make sure that the master + # gets killed regardless of whether it gets started or not. + self._pid_file = os.path.join(self._tmpdir.name, 'master-test.pid') + self.addCleanup(kill_with_extreme_prejudice, self._pid_file) + # Patch cli_control so that 1) it doesn't actually do a fork, since + # that makes it impossible to avoid race conditions in the test; 2) + # doesn't actually os.execl(). + with ExitStack() as resources: + resources.enter_context(patch( + 'mailman.commands.cli_control.os.fork', + # Pretend to be the child. + return_value=0 + )) + self._execl = resources.enter_context(patch( + 'mailman.commands.cli_control.os.execl')) + resources.enter_context(patch( + 'mailman.commands.cli_control.os.setsid')) + resources.enter_context(patch( + 'mailman.commands.cli_control.os.chdir')) + resources.enter_context(patch( + 'mailman.commands.cli_control.os.environ', + os.environ.copy())) + # Arrange for the mocks to be reverted when the test is over. + self.addCleanup(resources.pop_all().close) + + def test_master_is_elsewhere_and_missing(self): + with ExitStack() as resources: + bin_dir = resources.enter_context(TemporaryDirectory()) + old_master = os.path.join(config.BIN_DIR, 'master') + new_master = os.path.join(bin_dir, 'master') + shutil.move(old_master, new_master) + resources.callback(shutil.move, new_master, old_master) + results = self._command.invoke(start) + # Argument #2 to the execl() call should be the path to the master + # program, and the path should not exist. + self.assertEqual( + len(self._execl.call_args_list), 1, results.output) + posargs, kws = self._execl.call_args_list[0] + master_path = posargs[2] + self.assertEqual(os.path.basename(master_path), 'master') + self.assertFalse(os.path.exists(master_path), master_path) + + def test_master_is_elsewhere_and_findable(self): + with ExitStack() as resources: + bin_dir = resources.enter_context(TemporaryDirectory()) + old_master = os.path.join(config.BIN_DIR, 'master') + new_master = os.path.join(bin_dir, 'master') + shutil.move(old_master, new_master) + resources.callback(shutil.move, new_master, old_master) + with configuration('paths.testing', bin_dir=bin_dir): + results = self._command.invoke(start) + # Argument #2 to the execl() call should be the path to the master + # program, and the path should exist. + self.assertEqual( + len(self._execl.call_args_list), 1, results.output) + posargs, kws = self._execl.call_args_list[0] + master_path = posargs[2] + self.assertEqual(os.path.basename(master_path), 'master') + self.assertTrue(os.path.exists(master_path), master_path) + + def test_stale_lock_no_force(self): + claim_file = claim_lock() + self.addCleanup(os.remove, claim_file) + self.addCleanup(os.remove, config.LOCK_FILE) + result = self._command.invoke(start) + self.assertEqual(result.exit_code, 2) + self.assertEqual( + result.output, + 'Usage: start [OPTIONS]\n\n' + 'Error: A previous run of GNU Mailman did not exit cleanly ' + '(stale_lock). Try using --force\n') + + def test_stale_lock_force(self): + claim_file = claim_lock() + self.addCleanup(os.remove, claim_file) + self.addCleanup(os.remove, config.LOCK_FILE) + # Don't test the results of this command. Because we're mocking + # os.execl(), we'll end up raising the RuntimeError at the end of the + # start() method, child branch. + self._command.invoke(start, ('--force',)) + self.assertEqual(len(self._execl.call_args_list), 1) + posargs, kws = self._execl.call_args_list[0] + self.assertIn('--force', posargs) + + +class TestControlSimple(unittest.TestCase): + layer = ConfigLayer + maxDiff = None + + def setUp(self): + self._command = CliRunner() + + def test_watcher_state_conflict(self): + with patch('mailman.commands.cli_control.master_state', + return_value=(WatcherState.conflict, object())): + results = self._command.invoke(start) + self.assertEqual(results.exit_code, 2) + self.assertEqual( + results.output, + 'Usage: start [OPTIONS]\n\n' + 'Error: GNU Mailman is already running\n') + + def test_reopen(self): + with patch('mailman.commands.cli_control.kill_watcher') as mock: + result = self._command.invoke(reopen) + mock.assert_called_once_with(signal.SIGHUP) + self.assertEqual(result.output, 'Reopening the Mailman runners\n') + + def test_reopen_quiet(self): + with patch('mailman.commands.cli_control.kill_watcher') as mock: + result = self._command.invoke(reopen, ('--quiet',)) + mock.assert_called_once_with(signal.SIGHUP) + self.assertEqual(result.output, '') + + def test_restart(self): + with patch('mailman.commands.cli_control.kill_watcher') as mock: + result = self._command.invoke(restart) + mock.assert_called_once_with(signal.SIGUSR1) + self.assertEqual(result.output, 'Restarting the Mailman runners\n') + + def test_restart_quiet(self): + with patch('mailman.commands.cli_control.kill_watcher') as mock: + result = self._command.invoke(restart, ('--quiet',)) + mock.assert_called_once_with(signal.SIGUSR1) + self.assertEqual(result.output, '') diff --git a/src/mailman/commands/tests/test_cli_create.py b/src/mailman/commands/tests/test_cli_create.py new file mode 100644 index 000000000..364018d92 --- /dev/null +++ b/src/mailman/commands/tests/test_cli_create.py @@ -0,0 +1,116 @@ +# Copyright (C) 2011-2017 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. + +"""Test the `mailman create` subcommand.""" + +import unittest + +from click.testing import CliRunner +from mailman.app.lifecycle import create_list +from mailman.commands.cli_lists import create, remove +from mailman.interfaces.domain import IDomainManager +from mailman.testing.layers import ConfigLayer +from zope.component import getUtility + + +class TestCreate(unittest.TestCase): + layer = ConfigLayer + + def setUp(self): + self._command = CliRunner() + + def test_cannot_create_duplicate_list(self): + # Cannot create a mailing list if it already exists. + create_list('ant@example.com') + result = self._command.invoke(create, ('ant@example.com',)) + self.assertEqual(result.exit_code, 2) + self.assertEqual( + result.output, + 'Usage: create [OPTIONS] LISTNAME\n\n' + 'Error: List already exists: ant@example.com\n') + + def test_invalid_posting_address(self): + # Cannot create a mailing list with an invalid posting address. + result = self._command.invoke(create, ('foo',)) + self.assertEqual(result.exit_code, 2) + self.assertEqual( + result.output, + 'Usage: create [OPTIONS] LISTNAME\n\n' + 'Error: Illegal list name: foo\n') + + def test_invalid_owner_addresses(self): + # Cannot create a list with invalid owner addresses. LP: #778687 + result = self._command.invoke( + create, ('-o', 'invalid', 'ant@example.com')) + self.assertEqual(result.exit_code, 2) + self.assertEqual( + result.output, + 'Usage: create [OPTIONS] LISTNAME\n\n' + 'Error: Illegal owner addresses: invalid\n') + + def test_create_without_domain_option(self): + # The domain will be created if no domain options are specified. Use + # the example.org domain since example.com is created by the test + # suite so it would always already exist. + result = self._command.invoke(create, ('ant@example.org',)) + self.assertEqual(result.exit_code, 0) + domain = getUtility(IDomainManager)['example.org'] + self.assertEqual(domain.mail_host, 'example.org') + + def test_create_with_d(self): + result = self._command.invoke(create, ('ant@example.org', '-d')) + self.assertEqual(result.exit_code, 0) + domain = getUtility(IDomainManager)['example.org'] + self.assertEqual(domain.mail_host, 'example.org') + + def test_create_with_domain(self): + result = self._command.invoke(create, ('ant@example.org', '--domain')) + self.assertEqual(result.exit_code, 0) + domain = getUtility(IDomainManager)['example.org'] + self.assertEqual(domain.mail_host, 'example.org') + + def test_create_with_D(self): + result = self._command.invoke(create, ('ant@example.org', '-D')) + self.assertEqual(result.exit_code, 2) + self.assertEqual( + result.output, + 'Usage: create [OPTIONS] LISTNAME\n\n' + 'Error: Undefined domain: example.org\n') + + def test_create_with_nodomain(self): + result = self._command.invoke( + create, ('ant@example.org', '--no-domain')) + self.assertEqual(result.exit_code, 2) + self.assertEqual( + result.output, + 'Usage: create [OPTIONS] LISTNAME\n\n' + 'Error: Undefined domain: example.org\n') + + +class TestRemove(unittest.TestCase): + layer = ConfigLayer + + def setUp(self): + self._command = CliRunner() + + def test_remove_not_quiet_no_such_list(self): + results = self._command.invoke(remove, ('ant@example.com',)) + # It's not an error to try to remove a nonexistent list. + self.assertEqual(results.exit_code, 0) + self.assertEqual( + results.output, + 'No such list matching spec: ant@example.com\n') diff --git a/src/mailman/commands/tests/test_digests.py b/src/mailman/commands/tests/test_cli_digests.py index 0a3ea5226..3c033d60d 100644 --- a/src/mailman/commands/tests/test_digests.py +++ b/src/mailman/commands/tests/test_cli_digests.py @@ -20,10 +20,10 @@ import os import unittest +from click.testing import CliRunner from datetime import timedelta -from io import StringIO from mailman.app.lifecycle import create_list -from mailman.commands.cli_digests import Digests +from mailman.commands.cli_digests import digests from mailman.config import config from mailman.interfaces.digests import DigestFrequency from mailman.interfaces.member import DeliveryMode @@ -33,16 +33,6 @@ from mailman.testing.helpers import ( specialized_message_from_string as mfs, subscribe) from mailman.testing.layers import ConfigLayer from mailman.utilities.datetime import now as right_now -from unittest.mock import patch - - -class FakeArgs: - def __init__(self): - self.lists = [] - self.send = False - self.bump = False - self.dry_run = False - self.verbose = False class TestSendDigests(unittest.TestCase): @@ -53,7 +43,7 @@ class TestSendDigests(unittest.TestCase): self._mlist.digests_enabled = True self._mlist.digest_size_threshold = 100000 self._mlist.send_welcome_message = False - self._command = Digests() + self._command = CliRunner() self._handler = config.handlers['to-digest'] self._runner = make_testable_runner(DigestRunner, 'digest') # The mailing list needs at least one digest recipient. @@ -76,10 +66,7 @@ Subject: message 1 get_queue_messages('digest', expected_count=0) mailbox_path = os.path.join(self._mlist.data_path, 'digest.mmdf') self.assertGreater(os.path.getsize(mailbox_path), 0) - args = FakeArgs() - args.send = True - args.lists.append('ant.example.com') - self._command.process(args) + self._command.invoke(digests, ('-s', '-l', 'ant.example.com')) self._runner.run() # Now, there's no digest mbox and there's a plaintext digest in the # outgoing queue. @@ -105,10 +92,7 @@ Subject: message 1 get_queue_messages('digest', expected_count=0) mailbox_path = os.path.join(self._mlist.data_path, 'digest.mmdf') self.assertGreater(os.path.getsize(mailbox_path), 0) - args = FakeArgs() - args.send = True - args.lists.append('ant@example.com') - self._command.process(args) + self._command.invoke(digests, ('-s', '-l', 'ant@example.com')) self._runner.run() # Now, there's no digest mbox and there's a plaintext digest in the # outgoing queue. @@ -134,16 +118,12 @@ Subject: message 1 get_queue_messages('digest', expected_count=0) mailbox_path = os.path.join(self._mlist.data_path, 'digest.mmdf') self.assertGreater(os.path.getsize(mailbox_path), 0) - args = FakeArgs() - args.send = True - args.lists.append('bee.example.com') - stderr = StringIO() - with patch('mailman.commands.cli_digests.sys.stderr', stderr): - self._command.process(args) + result = self._command.invoke(digests, ('-s', '-l', 'bee.example.com')) + self.assertEqual(result.exit_code, 0) + self.assertEqual( + result.output, + 'No such list found: bee.example.com\n') self._runner.run() - # The warning was printed to stderr. - self.assertEqual(stderr.getvalue(), - 'No such list found: bee.example.com\n') # And no digest was prepared. self.assertGreater(os.path.getsize(mailbox_path), 0) get_queue_messages('virgin', expected_count=0) @@ -164,16 +144,12 @@ Subject: message 1 get_queue_messages('digest', expected_count=0) mailbox_path = os.path.join(self._mlist.data_path, 'digest.mmdf') self.assertGreater(os.path.getsize(mailbox_path), 0) - args = FakeArgs() - args.send = True - args.lists.append('bee@example.com') - stderr = StringIO() - with patch('mailman.commands.cli_digests.sys.stderr', stderr): - self._command.process(args) + result = self._command.invoke(digests, ('-s', '-l', 'bee@example.com')) + self.assertEqual(result.exit_code, 0) + self.assertEqual( + result.output, + 'No such list found: bee@example.com\n') self._runner.run() - # The warning was printed to stderr. - self.assertEqual(stderr.getvalue(), - 'No such list found: bee@example.com\n') # And no digest was prepared. self.assertGreater(os.path.getsize(mailbox_path), 0) get_queue_messages('virgin', expected_count=0) @@ -194,16 +170,14 @@ Subject: message 1 get_queue_messages('digest', expected_count=0) mailbox_path = os.path.join(self._mlist.data_path, 'digest.mmdf') self.assertGreater(os.path.getsize(mailbox_path), 0) - args = FakeArgs() - args.send = True - args.lists.extend(('ant.example.com', 'bee.example.com')) - stderr = StringIO() - with patch('mailman.commands.cli_digests.sys.stderr', stderr): - self._command.process(args) + result = self._command.invoke( + digests, + ('-s', '-l', 'ant.example.com', '-l', 'bee.example.com')) + self.assertEqual(result.exit_code, 0) + self.assertEqual( + result.output, + 'No such list found: bee.example.com\n') self._runner.run() - # The warning was printed to stderr. - self.assertEqual(stderr.getvalue(), - 'No such list found: bee.example.com\n') # But ant's digest was still prepared. self.assertFalse(os.path.exists(mailbox_path)) items = get_queue_messages('virgin', expected_count=1) @@ -251,10 +225,8 @@ Subject: message 3 # Both. get_queue_messages('digest', expected_count=0) # Process both list's digests. - args = FakeArgs() - args.send = True - args.lists.extend(('ant.example.com', 'bee@example.com')) - self._command.process(args) + self._command.invoke( + digests, ('-s', '-l', 'ant.example.com', '-l', 'bee@example.com')) self._runner.run() # Now, neither list has a digest mbox and but there are plaintext # digest in the outgoing queue for both. @@ -318,9 +290,7 @@ Subject: message 3 # Both. get_queue_messages('digest', expected_count=0) # Process all mailing list digests by not setting any arguments. - args = FakeArgs() - args.send = True - self._command.process(args) + self._command.invoke(digests, ('-s',)) self._runner.run() # Now, neither list has a digest mbox and but there are plaintext # digest in the outgoing queue for both. @@ -349,10 +319,7 @@ Subject: message 3 # can be sent. mailbox_path = os.path.join(self._mlist.data_path, 'digest.mmdf') self.assertFalse(os.path.exists(mailbox_path)) - args = FakeArgs() - args.send = True - args.lists.append('ant.example.com') - self._command.process(args) + self._command.invoke(digests, ('-s', '-l', 'ant.example.com')) self._runner.run() get_queue_messages('virgin', expected_count=0) @@ -369,11 +336,8 @@ Subject: message 1 """) self._handler.process(self._mlist, msg, {}) - args = FakeArgs() - args.bump = True - args.send = True - args.lists.append('ant.example.com') - self._command.process(args) + self._command.invoke( + digests, ('-s', '--bump', '-l', 'ant.example.com')) self._runner.run() # The volume is 8 and the digest number is 2 because a digest was sent # after the volume/number was bumped. @@ -393,15 +357,12 @@ class TestBumpVolume(unittest.TestCase): self._mlist.volume = 7 self._mlist.next_digest_number = 4 self.right_now = right_now() - self._command = Digests() + self._command = CliRunner() def test_bump_one_list(self): self._mlist.digest_last_sent_at = self.right_now + timedelta( days=-32) - args = FakeArgs() - args.bump = True - args.lists.append('ant.example.com') - self._command.process(args) + self._command.invoke(digests, ('-b', '-l', 'ant.example.com')) self.assertEqual(self._mlist.volume, 8) self.assertEqual(self._mlist.next_digest_number, 1) self.assertEqual(self._mlist.digest_last_sent_at, self.right_now) @@ -416,36 +377,23 @@ class TestBumpVolume(unittest.TestCase): bee.next_digest_number = 4 bee.digest_last_sent_at = self.right_now + timedelta( days=-32) - args = FakeArgs() - args.bump = True - args.lists.extend(('ant.example.com', 'bee.example.com')) - self._command.process(args) + self._command.invoke( + digests, ('-b', '-l', 'ant.example.com', '-l', 'bee.example.com')) self.assertEqual(self._mlist.volume, 8) self.assertEqual(self._mlist.next_digest_number, 1) self.assertEqual(self._mlist.digest_last_sent_at, self.right_now) def test_bump_verbose(self): - args = FakeArgs() - args.bump = True - args.verbose = True - args.lists.append('ant.example.com') - output = StringIO() - with patch('sys.stdout', output): - self._command.process(args) - self.assertMultiLineEqual(output.getvalue(), """\ + result = self._command.invoke( + digests, ('-v', '-b', '-l', 'ant.example.com')) + self.assertMultiLineEqual(result.output, """\ ant.example.com is at volume 7, number 4 ant.example.com bumped to volume 7, number 5 """) def test_send_verbose(self): - args = FakeArgs() - args.send = True - args.verbose = True - args.dry_run = True - args.lists.append('ant.example.com') - output = StringIO() - with patch('sys.stdout', output): - self._command.process(args) - self.assertMultiLineEqual(output.getvalue(), """\ + result = self._command.invoke( + digests, ('-v', '-s', '-n', '-l', 'ant.example.com')) + self.assertMultiLineEqual(result.output, """\ ant.example.com sent volume 7, number 4 """) diff --git a/src/mailman/commands/tests/test_import.py b/src/mailman/commands/tests/test_cli_import.py index e210889fc..5f97df294 100644 --- a/src/mailman/commands/tests/test_import.py +++ b/src/mailman/commands/tests/test_cli_import.py @@ -19,27 +19,23 @@ import unittest +from click.testing import CliRunner from mailman.app.lifecycle import create_list -from mailman.commands.cli_import import Import21 +from mailman.commands.cli_import import import21 from mailman.testing.layers import ConfigLayer +from mailman.utilities.importer import Import21Error +from pickle import dump from pkg_resources import resource_filename +from tempfile import NamedTemporaryFile from unittest.mock import patch -class FakeArgs: - listname = ['test@example.com'] - pickle_file = [ - resource_filename('mailman.testing', 'config-with-instances.pck'), - ] - - class TestImport(unittest.TestCase): layer = ConfigLayer def setUp(self): - self.command = Import21() - self.args = FakeArgs() - self.mlist = create_list('test@example.com') + self._command = CliRunner() + self.mlist = create_list('ant@example.com') @patch('mailman.commands.cli_import.import_config_pck') def test_process_pickle_with_bounce_info(self, import_config_pck): @@ -47,8 +43,34 @@ class TestImport(unittest.TestCase): # _BounceInfo instances. We throw these away when importing to # Mailman 3, but we have to fake the instance's classes, otherwise # unpickling the dictionaries will fail. + pckfile = resource_filename( + 'mailman.testing', 'config-with-instances.pck') try: - self.command.process(self.args) + self._command.invoke(import21, ('ant.example.com', pckfile)) except ImportError as error: self.fail('The pickle failed loading: {}'.format(error)) self.assertTrue(import_config_pck.called) + + def test_missing_list_spec(self): + result = self._command.invoke(import21) + self.assertEqual(result.exit_code, 2, result.output) + self.assertEqual( + result.output, + 'Usage: import21 [OPTIONS] LISTSPEC PICKLE_FILE\n\n' + 'Error: Missing argument "listspec".\n') + + def test_pickle_with_nondict(self): + with NamedTemporaryFile() as pckfile: + with open(pckfile.name, 'wb') as fp: + dump(['not', 'a', 'dict'], fp) + result = self._command.invoke( + import21, ('ant.example.com', pckfile.name)) + self.assertIn('Ignoring non-dictionary', result.output) + + def test_pickle_with_bad_language(self): + pckfile = resource_filename('mailman.testing', 'config.pck') + with patch('mailman.utilities.importer.check_language_code', + side_effect=Import21Error('Fake bad language code')): + result = self._command.invoke( + import21, ('ant.example.com', pckfile)) + self.assertIn('Fake bad language code', result.output) diff --git a/src/mailman/commands/tests/test_cli_inject.py b/src/mailman/commands/tests/test_cli_inject.py new file mode 100644 index 000000000..890ce3a54 --- /dev/null +++ b/src/mailman/commands/tests/test_cli_inject.py @@ -0,0 +1,66 @@ +# Copyright (C) 2017 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. + +"""Test the `inject` command.""" + +import unittest + +from click.testing import CliRunner +from io import StringIO +from mailman.app.lifecycle import create_list +from mailman.commands.cli_inject import inject +from mailman.testing.layers import ConfigLayer + + +class InterruptRaisingReader(StringIO): + def read(self, count=None): + # Fake enough of the API so click returns this instance unchanged. + if count is None: + raise KeyboardInterrupt + return b'' + + +class TestInject(unittest.TestCase): + layer = ConfigLayer + + def setUp(self): + self._command = CliRunner() + create_list('ant@example.com') + + def test_inject_keyboard_interrupt(self): + results = self._command.invoke( + inject, ('-f', '-', 'ant.example.com'), + input=InterruptRaisingReader()) + self.assertEqual(results.exit_code, 1) + self.assertEqual(results.output, 'Interrupted\n') + + def test_inject_no_such_list(self): + result = self._command.invoke(inject, ('bee.example.com',)) + self.assertEqual(result.exit_code, 2) + self.assertEqual( + result.output, + 'Usage: inject [OPTIONS] LISTSPEC\n\n' + 'Error: No such list: bee.example.com\n') + + def test_inject_no_such_queue(self): + result = self._command.invoke( + inject, ('--queue', 'bogus', 'ant.example.com')) + self.assertEqual(result.exit_code, 2) + self.assertEqual( + result.output, + 'Usage: inject [OPTIONS] LISTSPEC\n\n' + 'Error: No such queue: bogus\n') diff --git a/src/mailman/commands/tests/test_lists.py b/src/mailman/commands/tests/test_cli_lists.py index 8463b639f..6b0ff5cf3 100644 --- a/src/mailman/commands/tests/test_lists.py +++ b/src/mailman/commands/tests/test_cli_lists.py @@ -19,26 +19,20 @@ import unittest -from io import StringIO +from click.testing import CliRunner from mailman.app.lifecycle import create_list -from mailman.commands.cli_lists import Lists +from mailman.commands.cli_lists import lists from mailman.interfaces.domain import IDomainManager from mailman.testing.layers import ConfigLayer -from unittest.mock import patch from zope.component import getUtility -class FakeArgs: - advertised = False - names = False - descriptions = False - quiet = False - domain = [] - - class TestLists(unittest.TestCase): layer = ConfigLayer + def setUp(self): + self._command = CliRunner() + def test_lists_with_domain_option(self): # LP: #1166911 - non-matching lists were returned. getUtility(IDomainManager).add( @@ -48,14 +42,8 @@ class TestLists(unittest.TestCase): # Only this one should show up. create_list('test3@example.net') create_list('test4@example.com') - command = Lists() - args = FakeArgs() - args.domain.append('example.net') - output = StringIO() - with patch('sys.stdout', output): - command.process(args) - lines = output.getvalue().splitlines() - # The first line is the heading, so skip that. - lines.pop(0) - self.assertEqual(len(lines), 1, lines) - self.assertEqual(lines[0], 'test3@example.net') + result = self._command.invoke(lists, ('--domain', 'example.net')) + self.assertEqual(result.exit_code, 0) + self.assertEqual( + result.output, + '1 matching mailing lists found:\ntest3@example.net\n') diff --git a/src/mailman/commands/tests/test_members.py b/src/mailman/commands/tests/test_cli_members.py index 74c7414f6..9c72c61ed 100644 --- a/src/mailman/commands/tests/test_members.py +++ b/src/mailman/commands/tests/test_cli_members.py @@ -17,37 +17,15 @@ """Test the `mailman members` command.""" -import sys import unittest -from functools import partial -from io import StringIO +from click.testing import CliRunner from mailman.app.lifecycle import create_list -from mailman.commands.cli_members import Members +from mailman.commands.cli_members import members from mailman.interfaces.member import MemberRole from mailman.testing.helpers import subscribe from mailman.testing.layers import ConfigLayer from tempfile import NamedTemporaryFile -from unittest.mock import patch - - -class FakeArgs: - input_filename = None - output_filename = None - role = None - regular = None - digest = None - nomail = None - list = None - - -class FakeParser: - def __init__(self): - self.message = None - - def error(self, message): - self.message = message - sys.exit(1) class TestCLIMembers(unittest.TestCase): @@ -55,35 +33,25 @@ class TestCLIMembers(unittest.TestCase): def setUp(self): self._mlist = create_list('ant@example.com') - self.command = Members() - self.command.parser = FakeParser() - self.args = FakeArgs() + self._command = CliRunner() def test_no_such_list(self): - self.args.list = ['bee.example.com'] - with self.assertRaises(SystemExit): - self.command.process(self.args) - self.assertEqual(self.command.parser.message, - 'No such list: bee.example.com') - - def test_bad_delivery_status(self): - self.args.list = ['ant.example.com'] - self.args.nomail = 'bogus' - with self.assertRaises(SystemExit): - self.command.process(self.args) - self.assertEqual(self.command.parser.message, - 'Unknown delivery status: bogus') + result = self._command.invoke(members, ('bee.example.com',)) + self.assertEqual(result.exit_code, 2) + self.assertEqual( + result.output, + 'Usage: members [OPTIONS] LISTSPEC\n\n' + 'Error: No such list: bee.example.com\n') def test_role_administrator(self): subscribe(self._mlist, 'Anne', role=MemberRole.owner) subscribe(self._mlist, 'Bart', role=MemberRole.moderator) subscribe(self._mlist, 'Cate', role=MemberRole.nonmember) subscribe(self._mlist, 'Dave', role=MemberRole.member) - self.args.list = ['ant.example.com'] - self.args.role = 'administrator' with NamedTemporaryFile('w', encoding='utf-8') as outfp: - self.args.output_filename = outfp.name - self.command.process(self.args) + self._command.invoke(members, ( + '--role', 'administrator', '-o', outfp.name, + 'ant.example.com')) with open(outfp.name, 'r', encoding='utf-8') as infp: lines = infp.readlines() self.assertEqual(len(lines), 2) @@ -95,11 +63,9 @@ class TestCLIMembers(unittest.TestCase): subscribe(self._mlist, 'Bart', role=MemberRole.moderator) subscribe(self._mlist, 'Cate', role=MemberRole.nonmember) subscribe(self._mlist, 'Dave', role=MemberRole.member) - self.args.list = ['ant.example.com'] - self.args.role = 'any' with NamedTemporaryFile('w', encoding='utf-8') as outfp: - self.args.output_filename = outfp.name - self.command.process(self.args) + self._command.invoke(members, ( + '--role', 'any', '-o', outfp.name, 'ant.example.com')) with open(outfp.name, 'r', encoding='utf-8') as infp: lines = infp.readlines() self.assertEqual(len(lines), 4) @@ -113,34 +79,34 @@ class TestCLIMembers(unittest.TestCase): subscribe(self._mlist, 'Bart', role=MemberRole.moderator) subscribe(self._mlist, 'Cate', role=MemberRole.nonmember) subscribe(self._mlist, 'Dave', role=MemberRole.member) - self.args.list = ['ant.example.com'] - self.args.role = 'moderator' with NamedTemporaryFile('w', encoding='utf-8') as outfp: - self.args.output_filename = outfp.name - self.command.process(self.args) + self._command.invoke(members, ( + '--role', 'moderator', '-o', outfp.name, 'ant.example.com')) with open(outfp.name, 'r', encoding='utf-8') as infp: lines = infp.readlines() self.assertEqual(len(lines), 1) self.assertEqual(lines[0], 'Bart Person <bperson@example.com>\n') - def test_bad_role(self): - self.args.list = ['ant.example.com'] - self.args.role = 'bogus' - with self.assertRaises(SystemExit): - self.command.process(self.args) - self.assertEqual(self.command.parser.message, - 'Unknown member role: bogus') + def test_role_nonmember(self): + subscribe(self._mlist, 'Anne', role=MemberRole.owner) + subscribe(self._mlist, 'Bart', role=MemberRole.moderator) + subscribe(self._mlist, 'Cate', role=MemberRole.nonmember) + subscribe(self._mlist, 'Dave', role=MemberRole.member) + with NamedTemporaryFile('w', encoding='utf-8') as outfp: + self._command.invoke(members, ( + '--role', 'nonmember', '-o', outfp.name, 'ant.example.com')) + with open(outfp.name, 'r', encoding='utf-8') as infp: + lines = infp.readlines() + self.assertEqual(len(lines), 1) + self.assertEqual(lines[0], 'Cate Person <cperson@example.com>\n') def test_already_subscribed_with_display_name(self): subscribe(self._mlist, 'Anne') - outfp = StringIO() with NamedTemporaryFile('w', buffering=1, encoding='utf-8') as infp: print('Anne Person <aperson@example.com>', file=infp) - self.args.list = ['ant.example.com'] - self.args.input_filename = infp.name - with patch('builtins.print', partial(print, file=outfp)): - self.command.process(self.args) + result = self._command.invoke(members, ( + '--add', infp.name, 'ant.example.com')) self.assertEqual( - outfp.getvalue(), + result.output, 'Already subscribed (skipping): Anne Person <aperson@example.com>\n' ) diff --git a/src/mailman/commands/tests/test_cli_qfile.py b/src/mailman/commands/tests/test_cli_qfile.py new file mode 100644 index 000000000..b907abcc6 --- /dev/null +++ b/src/mailman/commands/tests/test_cli_qfile.py @@ -0,0 +1,59 @@ +# Copyright (C) 2017 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. + +"""Test the qfile command.""" + +import unittest + +from click.testing import CliRunner +from contextlib import ExitStack +from mailman.commands.cli_qfile import qfile +from mailman.testing.layers import ConfigLayer +from pickle import dump +from tempfile import NamedTemporaryFile +from unittest.mock import patch + + +class TestUnshunt(unittest.TestCase): + layer = ConfigLayer + maxDiff = None + + def setUp(self): + self._command = CliRunner() + + def test_print_str(self): + with NamedTemporaryFile() as tmp_qfile: + with open(tmp_qfile.name, 'wb') as fp: + dump('a simple string', fp) + results = self._command.invoke(qfile, (tmp_qfile.name,)) + self.assertEqual(results.output, """\ +[----- start pickle -----] +<----- start object 1 -----> +a simple string +[----- end pickle -----] +""", results.output) + + def test_interactive(self): + with ExitStack() as resources: + tmp_qfile = resources.enter_context(NamedTemporaryFile()) + mock = resources.enter_context(patch( + 'mailman.commands.cli_qfile.interact')) + with open(tmp_qfile.name, 'wb') as fp: + dump('a simple string', fp) + self._command.invoke(qfile, (tmp_qfile.name, '-i')) + mock.assert_called_once_with( + banner="Number of objects found (see the variable 'm'): 1") diff --git a/src/mailman/commands/tests/test_cli_shell.py b/src/mailman/commands/tests/test_cli_shell.py new file mode 100644 index 000000000..ffcf43803 --- /dev/null +++ b/src/mailman/commands/tests/test_cli_shell.py @@ -0,0 +1,173 @@ +# Copyright (C) 2016-2017 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. + +"""Test the withlist/shell command.""" + +import os +import unittest + +from click.testing import CliRunner +from contextlib import ExitStack +from mailman.app.lifecycle import create_list +from mailman.commands.cli_withlist import shell +from mailman.config import config +from mailman.interfaces.usermanager import IUserManager +from mailman.testing.helpers import configuration +from mailman.testing.layers import ConfigLayer +from mailman.utilities.modules import hacked_sys_modules +from unittest.mock import MagicMock, patch + +try: + import readline # noqa: F401 + has_readline = True +except ImportError: + has_readline = False + + +class TestShell(unittest.TestCase): + layer = ConfigLayer + + def setUp(self): + self._command = CliRunner() + + def test_namespace(self): + with patch('mailman.commands.cli_withlist.start_python') as mock: + self._command.invoke(shell, ('--interactive',)) + self.assertEqual(mock.call_count, 1) + # Don't test that all names are available, just a few choice ones. + positional, keywords = mock.call_args + namespace = positional[0] + self.assertIn('getUtility', namespace) + self.assertIn('IArchiver', namespace) + self.assertEqual(namespace['IUserManager'], IUserManager) + + @configuration('shell', banner='my banner') + def test_banner(self): + with patch('mailman.commands.cli_withlist.interact') as mock: + self._command.invoke(shell, ('--interactive',)) + self.assertEqual(mock.call_count, 1) + positional, keywords = mock.call_args + self.assertEqual(keywords['banner'], 'my banner\n') + + @unittest.skipUnless(has_readline, 'readline module is not available') + @configuration('shell', history_file='$var_dir/history.py') + def test_history_file(self): + with patch('mailman.commands.cli_withlist.interact'): + self._command.invoke(shell, ('--interactive',)) + history_file = os.path.join(config.VAR_DIR, 'history.py') + self.assertTrue(os.path.exists(history_file)) + + @configuration('shell', use_ipython='yes') + def test_start_ipython4(self): + mock = MagicMock() + with hacked_sys_modules('IPython.terminal.embed', mock): + self._command.invoke(shell, ('--interactive',)) + posargs, kws = mock.InteractiveShellEmbed.instance().mainloop.call_args + self.assertEqual( + kws['display_banner'], 'Welcome to the GNU Mailman shell\n') + + @configuration('shell', use_ipython='yes') + def test_start_ipython1(self): + mock = MagicMock() + with hacked_sys_modules('IPython.frontend.terminal.embed', mock): + self._command.invoke(shell, ('--interactive',)) + posargs, kws = mock.InteractiveShellEmbed.instance.call_args + self.assertEqual( + kws['banner1'], 'Welcome to the GNU Mailman shell\n') + + @configuration('shell', use_ipython='debug') + def test_start_ipython_debug(self): + mock = MagicMock() + with hacked_sys_modules('IPython.terminal.embed', mock): + self._command.invoke(shell, ('--interactive',)) + posargs, kws = mock.InteractiveShellEmbed.instance().mainloop.call_args + self.assertEqual( + kws['display_banner'], 'Welcome to the GNU Mailman shell\n') + + @configuration('shell', use_ipython='oops') + def test_start_ipython_invalid(self): + mock = MagicMock() + with hacked_sys_modules('IPython.terminal.embed', mock): + results = self._command.invoke(shell, ('--interactive',)) + self.assertEqual( + results.output, + 'Invalid value for [shell]use_python: oops\n') + # mainloop() never got called. + self.assertIsNone( + mock.InteractiveShellEmbed.instance().mainloop.call_args) + + @configuration('shell', use_ipython='yes') + def test_start_ipython_uninstalled(self): + with ExitStack() as resources: + # Pretend iPython isn't available at all. + resources.enter_context(patch( + 'mailman.commands.cli_withlist.start_ipython1', + return_value=None)) + resources.enter_context(patch( + 'mailman.commands.cli_withlist.start_ipython4', + return_value=None)) + results = self._command.invoke(shell, ('--interactive',)) + self.assertEqual( + results.output, + 'ipython is not available, set use_ipython to no\n') + + def test_regex_without_run(self): + results = self._command.invoke(shell, ('-l', '^.*example.com')) + self.assertEqual(results.exit_code, 2) + self.assertEqual( + results.output, + 'Usage: shell [OPTIONS] [RUN_ARGS]...\n\n' + 'Error: Regular expression requires --run\n') + + def test_listspec_without_run(self): + create_list('ant@example.com') + mock = MagicMock() + with ExitStack() as resources: + resources.enter_context( + hacked_sys_modules('IPython.terminal.embed', mock)) + interactive_mock = resources.enter_context(patch( + 'mailman.commands.cli_withlist.do_interactive')) + self._command.invoke(shell, ('-l', 'ant.example.com')) + posargs, kws = interactive_mock.call_args + self.assertEqual( + posargs[1], + "The variable 'm' is the ant.example.com mailing list") + + def test_listspec_without_run_no_such_list(self): + results = self._command.invoke(shell, ('-l', 'ant.example.com')) + self.assertEqual(results.exit_code, 2) + self.assertEqual( + results.output, + 'Usage: shell [OPTIONS] [RUN_ARGS]...\n\n' + 'Error: No such list: ant.example.com\n') + + def test_run_without_listspec(self): + results = self._command.invoke(shell, ('--run', 'something')) + self.assertEqual(results.exit_code, 2) + self.assertEqual( + results.output, + 'Usage: shell [OPTIONS] [RUN_ARGS]...\n\n' + 'Error: --run requires a mailing list\n') + + def test_run_bogus_listspec(self): + results = self._command.invoke( + shell, ('-l', 'bee.example.com', '--run', 'something')) + self.assertEqual(results.exit_code, 2) + self.assertEqual( + results.output, + 'Usage: shell [OPTIONS] [RUN_ARGS]...\n\n' + 'Error: No such list: bee.example.com\n') diff --git a/src/mailman/commands/tests/test_cli_status.py b/src/mailman/commands/tests/test_cli_status.py new file mode 100644 index 000000000..4e9e3080e --- /dev/null +++ b/src/mailman/commands/tests/test_cli_status.py @@ -0,0 +1,64 @@ +# Copyright (C) 2017 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. + +"""Test the status command.""" + +import socket +import unittest + +from click.testing import CliRunner +from mailman.bin.master import WatcherState +from mailman.commands.cli_status import status +from mailman.testing.layers import ConfigLayer +from unittest.mock import patch + + +class FakeLock: + details = ('localhost', 9999, None) + + +class TestStatus(unittest.TestCase): + layer = ConfigLayer + maxDiff = None + + def setUp(self): + self._command = CliRunner() + + def test_stale_lock(self): + with patch('mailman.commands.cli_status.master_state', + return_value=(WatcherState.stale_lock, FakeLock())): + results = self._command.invoke(status) + self.assertEqual(results.exit_code, + WatcherState.stale_lock.value, + results.output) + self.assertEqual( + results.output, + 'GNU Mailman is stopped (stale pid: 9999)\n', + results.output) + + def test_unknown_state(self): + with patch('mailman.commands.cli_status.master_state', + return_value=(WatcherState.host_mismatch, FakeLock())): + results = self._command.invoke(status) + self.assertEqual(results.exit_code, + WatcherState.host_mismatch.value, + results.output) + self.assertEqual( + results.output, + 'GNU Mailman is in an unexpected state ' + '(localhost != {})\n'.format(socket.getfqdn()), + results.output) diff --git a/src/mailman/commands/tests/test_cli_unshunt.py b/src/mailman/commands/tests/test_cli_unshunt.py new file mode 100644 index 000000000..70303e1b4 --- /dev/null +++ b/src/mailman/commands/tests/test_cli_unshunt.py @@ -0,0 +1,45 @@ +# Copyright (C) 2017 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. + +"""Test the `unshunt` command.""" + +import unittest + +from click.testing import CliRunner +from mailman.commands.cli_unshunt import unshunt +from mailman.config import config +from mailman.email.message import Message +from mailman.testing.layers import ConfigLayer +from unittest.mock import patch + + +class TestUnshunt(unittest.TestCase): + layer = ConfigLayer + maxDiff = None + + def setUp(self): + self._command = CliRunner() + self._queue = config.switchboards['shunt'] + + def test_dequeue_fails(self): + filebase = self._queue.enqueue(Message(), {}) + with patch.object(self._queue, 'dequeue', + side_effect=RuntimeError('oops!')): + results = self._command.invoke(unshunt) + self.assertEqual( + results.output, + 'Cannot unshunt message {}, skipping:\noops!\n'.format(filebase)) diff --git a/src/mailman/commands/tests/test_conf.py b/src/mailman/commands/tests/test_conf.py deleted file mode 100644 index 94ca0a979..000000000 --- a/src/mailman/commands/tests/test_conf.py +++ /dev/null @@ -1,108 +0,0 @@ -# Copyright (C) 2013-2017 by the Free Software Foundation, Inc. -# -# This file is part of GNU Mailman. -# -# GNU Mailman is free software: you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free -# Software Foundation, either version 3 of the License, or (at your option) -# any later version. -# -# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -# more details. -# -# You should have received a copy of the GNU General Public License along with -# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. - -"""Test the conf subcommand.""" - -import os -import sys -import tempfile -import unittest - -from io import StringIO -from mailman.commands.cli_conf import Conf -from mailman.testing.layers import ConfigLayer -from unittest import mock - - -class FakeArgs: - section = None - key = None - output = None - sort = False - - -class FakeParser: - def __init__(self): - self.message = None - - def error(self, message): - self.message = message - sys.exit(1) - - -class TestConf(unittest.TestCase): - """Test the conf subcommand.""" - - layer = ConfigLayer - - def setUp(self): - self.command = Conf() - self.command.parser = FakeParser() - self.args = FakeArgs() - - def test_cannot_access_nonexistent_section(self): - self.args.section = 'thissectiondoesnotexist' - self.args.key = None - with self.assertRaises(SystemExit): - self.command.process(self.args) - self.assertEqual(self.command.parser.message, - 'No such section: thissectiondoesnotexist') - - def test_cannot_access_nonexistent_key(self): - self.args.section = "mailman" - self.args.key = 'thiskeydoesnotexist' - with self.assertRaises(SystemExit): - self.command.process(self.args) - self.assertEqual(self.command.parser.message, - 'Section mailman: No such key: thiskeydoesnotexist') - - def test_output_to_explicit_stdout(self): - self.args.output = '-' - self.args.section = 'shell' - self.args.key = 'use_ipython' - with mock.patch('sys.stdout') as mock_object: - self.command.process(self.args) - mock_object.write.assert_has_calls( - [mock.call('no'), mock.call('\n')]) - - def test_output_to_file(self): - self.args.section = 'shell' - self.args.key = 'use_ipython' - fd, filename = tempfile.mkstemp() - try: - self.args.output = filename - self.command.process(self.args) - with open(filename, 'r') as fp: - contents = fp.read() - finally: - os.remove(filename) - self.assertEqual(contents, 'no\n') - - def test_sort_by_section(self): - self.args.output = '-' - self.args.sort = True - output = StringIO() - with mock.patch('sys.stdout', output): - self.command.process(self.args) - last_line = '' - for line in output.getvalue().splitlines(): - if not line.startswith('['): - # This is a continuation line. --sort doesn't sort these. - continue - self.assertTrue(line > last_line, - '{} !> {}'.format(line, last_line)) - last_line = line diff --git a/src/mailman/commands/tests/test_control.py b/src/mailman/commands/tests/test_control.py deleted file mode 100644 index 473e78363..000000000 --- a/src/mailman/commands/tests/test_control.py +++ /dev/null @@ -1,244 +0,0 @@ -# Copyright (C) 2011-2017 by the Free Software Foundation, Inc. -# -# This file is part of GNU Mailman. -# -# GNU Mailman is free software: you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free -# Software Foundation, either version 3 of the License, or (at your option) -# any later version. -# -# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -# more details. -# -# You should have received a copy of the GNU General Public License along with -# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. - -"""Test some additional corner cases for starting/stopping.""" - -import os -import sys -import time -import errno -import shutil -import signal -import socket -import unittest - -from contextlib import ExitStack, suppress -from datetime import datetime, timedelta -from mailman.commands.cli_control import Start, kill_watcher -from mailman.config import Configuration, config -from mailman.testing.helpers import configuration -from mailman.testing.layers import ConfigLayer -from tempfile import TemporaryDirectory - - -SEP = '|' - - -def make_config(): - # All we care about is the master process; normally it starts a bunch of - # runners, but we don't care about any of them, so write a test - # configuration file for the master that disables all the runners. - new_config = 'no-runners.cfg' - config_file = os.path.join(os.path.dirname(config.filename), new_config) - shutil.copyfile(config.filename, config_file) - with open(config_file, 'a') as fp: - for runner_config in config.runner_configs: - print('[{}]\nstart:no\n'.format(runner_config.name), file=fp) - return config_file - - -def find_master(): - # See if the master process is still running. - until = timedelta(seconds=10) + datetime.now() - while datetime.now() < until: - time.sleep(0.1) - with suppress(FileNotFoundError, ValueError, ProcessLookupError): - with open(config.PID_FILE) as fp: - pid = int(fp.read().strip()) - os.kill(pid, 0) - return pid - return None - - -def kill_with_extreme_prejudice(pid=None): - # 2016-12-03 barry: We have intermittent hangs during both local and CI - # test suite runs where killing a runner or master process doesn't - # terminate the process. In those cases, wait()ing on the child can - # suspend the test process indefinitely. Locally, you have to C-c the - # test process, but that still doesn't kill it; the process continues to - # run in the background. If you then search for the process's pid and - # SIGTERM it, it will usually exit, which is why I don't understand why - # the above SIGTERM doesn't kill it sometimes. However, when run under - # CI, the test suite will just hang until the CI runner times it out. It - # would be better to figure out the underlying cause, because we have - # definitely seen other situations where a runner process won't exit, but - # for testing purposes we're just trying to clean up some resources so - # after a brief attempt at SIGTERMing it, let's SIGKILL it and warn. - if pid is not None: - os.kill(pid, signal.SIGTERM) - until = timedelta(seconds=10) + datetime.now() - while datetime.now() < until: - try: - if pid is None: - os.wait3(os.WNOHANG) - else: - os.waitpid(pid, os.WNOHANG) - except ChildProcessError: - # This basically means we went one too many times around the - # loop. The previous iteration successfully reaped the child. - # Because the return status of wait3() and waitpid() are different - # in those cases, it's easier just to catch the exception for - # either call and exit. - return - time.sleep(0.1) - else: - if pid is None: - # There's really not much more we can do because we have no pid to - # SIGKILL. Just report the problem and continue. - print('WARNING: NO CHANGE IN CHILD PROCESS STATES', - file=sys.stderr) - return - print('WARNING: SIGTERM DID NOT EXIT PROCESS; SIGKILLing', - file=sys.stderr) - if pid is not None: - os.kill(pid, signal.SIGKILL) - until = timedelta(seconds=10) + datetime.now() - while datetime.now() < until: - status = os.waitpid(pid, os.WNOHANG) - if status == (0, 0): - # The child was reaped. - return - time.sleep(0.1) - else: - print('WARNING: SIGKILL DID NOT EXIT PROCESS!', file=sys.stderr) - - -class FakeArgs: - force = None - run_as_user = None - quiet = True - config = None - - -class FakeParser: - def __init__(self): - self.message = None - - def error(self, message): - self.message = message - sys.exit(1) - - -class TestStart(unittest.TestCase): - """Test various starting scenarios.""" - - layer = ConfigLayer - - def setUp(self): - self.command = Start() - self.command.parser = FakeParser() - self.args = FakeArgs() - self.args.config = make_config() - - def tearDown(self): - try: - with open(config.PID_FILE) as fp: - master_pid = int(fp.read()) - except OSError as error: - if error.errno != errno.ENOENT: - raise - # There is no master, so just ignore this. - return - kill_watcher(signal.SIGTERM) - os.waitpid(master_pid, 0) - - def test_force_stale_lock(self): - # Fake an acquisition of the master lock by another process, which - # subsequently goes stale. Start by finding a free process id. Yes, - # this could race, but given that we're starting with our own PID and - # searching downward, it's less likely. - fake_pid = os.getpid() - 1 - while fake_pid > 1: - try: - os.kill(fake_pid, 0) - except OSError as error: - if error.errno == errno.ESRCH: - break - fake_pid -= 1 - else: - raise RuntimeError('Cannot find free PID') - # Lock acquisition logic taken from flufl.lock. - claim_file = SEP.join(( - config.LOCK_FILE, - socket.getfqdn(), - str(fake_pid), - '0')) - with open(config.LOCK_FILE, 'w') as fp: - fp.write(claim_file) - os.link(config.LOCK_FILE, claim_file) - expiration_date = datetime.now() - timedelta(minutes=60) - t = time.mktime(expiration_date.timetuple()) - os.utime(claim_file, (t, t)) - # Start without --force; no master will be running. - with suppress(SystemExit): - self.command.process(self.args) - self.assertIsNone(find_master()) - self.assertIn('--force', self.command.parser.message) - # Start again, this time with --force. - self.args.force = True - self.command.process(self.args) - pid = find_master() - self.assertIsNotNone(pid) - - -class TestBinDir(unittest.TestCase): - """Test issues related to bin_dir, e.g. issue #3""" - - layer = ConfigLayer - - def setUp(self): - self.command = Start() - self.command.parser = FakeParser() - self.args = FakeArgs() - self.args.config = make_config() - - def test_master_is_elsewhere(self): - with ExitStack() as resources: - # Patch os.fork() so that we can record the failing child process's - # id. We need to wait on the child exiting in either case, and - # when it fails, no master.pid will be written. - bin_dir = resources.enter_context(TemporaryDirectory()) - old_master = os.path.join(config.BIN_DIR, 'master') - new_master = os.path.join(bin_dir, 'master') - shutil.move(old_master, new_master) - resources.callback(shutil.move, new_master, old_master) - # Starting mailman should fail because 'master' can't be found. - # XXX This will print Errno 2 on the console because we're not - # silencing the child process's stderr. - self.command.process(self.args) - # There should be no pid file. - args_config = Configuration() - args_config.load(self.args.config) - self.assertFalse(os.path.exists(args_config.PID_FILE)) - kill_with_extreme_prejudice() - - def test_master_is_elsewhere_and_findable(self): - with ExitStack() as resources: - bin_dir = resources.enter_context(TemporaryDirectory()) - old_master = os.path.join(config.BIN_DIR, 'master') - new_master = os.path.join(bin_dir, 'master') - shutil.move(old_master, new_master) - resources.enter_context( - configuration('paths.testing', bin_dir=bin_dir)) - resources.callback(shutil.move, new_master, old_master) - # Starting mailman should find master in the new bin_dir. - self.command.process(self.args) - # There should a pid file and the process it describes should be - # killable. We might have to wait until the process has started. - master_pid = find_master() - self.assertIsNotNone(master_pid, 'master did not start') - kill_with_extreme_prejudice(master_pid) diff --git a/src/mailman/commands/tests/test_create.py b/src/mailman/commands/tests/test_create.py deleted file mode 100644 index f2232eb6f..000000000 --- a/src/mailman/commands/tests/test_create.py +++ /dev/null @@ -1,110 +0,0 @@ -# Copyright (C) 2011-2017 by the Free Software Foundation, Inc. -# -# This file is part of GNU Mailman. -# -# GNU Mailman is free software: you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free -# Software Foundation, either version 3 of the License, or (at your option) -# any later version. -# -# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -# more details. -# -# You should have received a copy of the GNU General Public License along with -# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. - -"""Test the `mailman create` subcommand.""" - -import sys -import unittest - -from argparse import ArgumentParser -from contextlib import suppress -from mailman.app.lifecycle import create_list -from mailman.commands.cli_lists import Create -from mailman.testing.layers import ConfigLayer - - -class FakeArgs: - language = None - owners = [] - quiet = False - domain = None - listname = None - notify = False - - -class FakeParser: - def __init__(self): - self.message = None - - def error(self, message): - self.message = message - sys.exit(1) - - -class TestCreate(unittest.TestCase): - layer = ConfigLayer - - def setUp(self): - self.command = Create() - self.command.parser = FakeParser() - self.args = FakeArgs() - - def test_cannot_create_duplicate_list(self): - # Cannot create a mailing list if it already exists. - create_list('test@example.com') - self.args.listname = ['test@example.com'] - with suppress(SystemExit): - self.command.process(self.args) - self.assertEqual(self.command.parser.message, - 'List already exists: test@example.com') - - def test_invalid_posting_address(self): - # Cannot create a mailing list with an invalid posting address. - self.args.listname = ['foo'] - with suppress(SystemExit): - self.command.process(self.args) - self.assertEqual(self.command.parser.message, - 'Illegal list name: foo') - - def test_invalid_owner_addresses(self): - # Cannot create a list with invalid owner addresses. LP: #778687 - self.args.listname = ['test@example.com'] - self.args.owners = ['main=True'] - with suppress(SystemExit): - self.command.process(self.args) - self.assertEqual(self.command.parser.message, - 'Illegal owner addresses: main=True') - - def test_without_domain_option(self): - # The domain will be created if no domain options are specified. - parser = ArgumentParser() - self.command.add(FakeParser(), parser) - args = parser.parse_args('test@example.org'.split()) - self.assertTrue(args.domain) - - def test_with_domain_option(self): - # The domain will be created if -d is given explicitly. - parser = ArgumentParser() - self.command.add(FakeParser(), parser) - args = parser.parse_args('-d test@example.org'.split()) - self.assertTrue(args.domain) - - def test_with_nodomain_option(self): - # The domain will not be created if --no-domain is given. - parser = ArgumentParser() - self.command.add(FakeParser(), parser) - args = parser.parse_args('-D test@example.net'.split()) - self.assertFalse(args.domain) - - def test_error_when_not_creating_domain(self): - self.args.domain = False - self.args.listname = ['test@example.org'] - with self.assertRaises(SystemExit) as cm: - self.command.process(self.args) - self.assertEqual(cm.exception.code, 1) - self.assertEqual(self.command.parser.message, - 'Undefined domain: example.org') diff --git a/src/mailman/commands/tests/test_confirm.py b/src/mailman/commands/tests/test_eml_confirm.py index 1ef392b76..1ef392b76 100644 --- a/src/mailman/commands/tests/test_confirm.py +++ b/src/mailman/commands/tests/test_eml_confirm.py diff --git a/src/mailman/commands/tests/test_help.py b/src/mailman/commands/tests/test_eml_help.py index 3090de9cc..3090de9cc 100644 --- a/src/mailman/commands/tests/test_help.py +++ b/src/mailman/commands/tests/test_eml_help.py diff --git a/src/mailman/commands/tests/test_membership.py b/src/mailman/commands/tests/test_eml_membership.py index 6cf4802c6..6cf4802c6 100644 --- a/src/mailman/commands/tests/test_membership.py +++ b/src/mailman/commands/tests/test_eml_membership.py diff --git a/src/mailman/commands/tests/test_shell.py b/src/mailman/commands/tests/test_shell.py deleted file mode 100644 index 2f7998f9f..000000000 --- a/src/mailman/commands/tests/test_shell.py +++ /dev/null @@ -1,81 +0,0 @@ -# Copyright (C) 2016-2017 by the Free Software Foundation, Inc. -# -# This file is part of GNU Mailman. -# -# GNU Mailman is free software: you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free -# Software Foundation, either version 3 of the License, or (at your option) -# any later version. -# -# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -# more details. -# -# You should have received a copy of the GNU General Public License along with -# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. - -"""Test the withlist/shell command.""" - -import os -import unittest - -from mailman.commands.cli_withlist import Withlist -from mailman.config import config -from mailman.interfaces.usermanager import IUserManager -from mailman.testing.helpers import configuration -from mailman.testing.layers import ConfigLayer -from unittest.mock import patch - -try: - import readline # noqa: F401 - has_readline = True -except ImportError: - has_readline = False - - -class FakeArgs: - interactive = None - run = None - details = False - listname = None - - -class TestShell(unittest.TestCase): - layer = ConfigLayer - - def setUp(self): - self._shell = Withlist() - - def test_namespace(self): - args = FakeArgs() - args.interactive = True - with patch.object(self._shell, '_start_python') as mock: - self._shell.process(args) - self.assertEqual(mock.call_count, 1) - # Don't test that all names are available, just a few choice ones. - positional, keywords = mock.call_args - namespace = positional[0] - self.assertIn('getUtility', namespace) - self.assertIn('IArchiver', namespace) - self.assertEqual(namespace['IUserManager'], IUserManager) - - @configuration('shell', banner='my banner') - def test_banner(self): - args = FakeArgs() - args.interactive = True - with patch('mailman.commands.cli_withlist.interact') as mock: - self._shell.process(args) - self.assertEqual(mock.call_count, 1) - positional, keywords = mock.call_args - self.assertEqual(keywords['banner'], 'my banner\n') - - @unittest.skipUnless(has_readline, 'readline module is not available') - @configuration('shell', history_file='$var_dir/history.py') - def test_history_file(self): - args = FakeArgs() - args.interactive = True - with patch('mailman.commands.cli_withlist.interact'): - self._shell.process(args) - history_file = os.path.join(config.VAR_DIR, 'history.py') - self.assertTrue(os.path.exists(history_file)) |
