summaryrefslogtreecommitdiff
path: root/src/mailman/commands/tests
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman/commands/tests')
-rw-r--r--src/mailman/commands/tests/data/__init__.py0
-rw-r--r--src/mailman/commands/tests/data/no-runners.cfg51
-rw-r--r--src/mailman/commands/tests/test_cli_conf.py74
-rw-r--r--src/mailman/commands/tests/test_cli_control.py295
-rw-r--r--src/mailman/commands/tests/test_cli_create.py116
-rw-r--r--src/mailman/commands/tests/test_cli_digests.py (renamed from src/mailman/commands/tests/test_digests.py)128
-rw-r--r--src/mailman/commands/tests/test_cli_import.py (renamed from src/mailman/commands/tests/test_import.py)46
-rw-r--r--src/mailman/commands/tests/test_cli_inject.py66
-rw-r--r--src/mailman/commands/tests/test_cli_lists.py (renamed from src/mailman/commands/tests/test_lists.py)32
-rw-r--r--src/mailman/commands/tests/test_cli_members.py (renamed from src/mailman/commands/tests/test_members.py)96
-rw-r--r--src/mailman/commands/tests/test_cli_qfile.py59
-rw-r--r--src/mailman/commands/tests/test_cli_shell.py173
-rw-r--r--src/mailman/commands/tests/test_cli_status.py64
-rw-r--r--src/mailman/commands/tests/test_cli_unshunt.py45
-rw-r--r--src/mailman/commands/tests/test_conf.py108
-rw-r--r--src/mailman/commands/tests/test_control.py244
-rw-r--r--src/mailman/commands/tests/test_create.py110
-rw-r--r--src/mailman/commands/tests/test_eml_confirm.py (renamed from src/mailman/commands/tests/test_confirm.py)0
-rw-r--r--src/mailman/commands/tests/test_eml_help.py (renamed from src/mailman/commands/tests/test_help.py)0
-rw-r--r--src/mailman/commands/tests/test_eml_membership.py (renamed from src/mailman/commands/tests/test_membership.py)0
-rw-r--r--src/mailman/commands/tests/test_shell.py81
21 files changed, 1056 insertions, 732 deletions
diff --git a/src/mailman/commands/tests/data/__init__.py b/src/mailman/commands/tests/data/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/mailman/commands/tests/data/__init__.py
diff --git a/src/mailman/commands/tests/data/no-runners.cfg b/src/mailman/commands/tests/data/no-runners.cfg
new file mode 100644
index 000000000..8f423460c
--- /dev/null
+++ b/src/mailman/commands/tests/data/no-runners.cfg
@@ -0,0 +1,51 @@
+# Copyright (C) 2008-2017 by the Free Software Foundation, Inc.
+#
+# This file is part of GNU Mailman.
+#
+# GNU Mailman is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
+
+# Disable all runners.
+
+[runner.archive]
+start: no
+
+[runner.bounces]
+start: no
+
+[runner.command]
+start: no
+
+[runner.in]
+start: no
+
+[runner.lmtp]
+start: no
+
+[runner.nntp]
+start: no
+
+[runner.out]
+start: no
+
+[runner.pipeline]
+start: no
+
+[runner.retry]
+start: no
+
+[runner.shunt]
+start: no
+
+[runner.virgin]
+start: no
diff --git a/src/mailman/commands/tests/test_cli_conf.py b/src/mailman/commands/tests/test_cli_conf.py
new file mode 100644
index 000000000..27fca27ac
--- /dev/null
+++ b/src/mailman/commands/tests/test_cli_conf.py
@@ -0,0 +1,74 @@
+# Copyright (C) 2013-2017 by the Free Software Foundation, Inc.
+#
+# This file is part of GNU Mailman.
+#
+# GNU Mailman is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
+
+"""Test the conf subcommand."""
+
+import unittest
+
+from click.testing import CliRunner
+from mailman.commands.cli_conf import conf
+from mailman.testing.layers import ConfigLayer
+from tempfile import NamedTemporaryFile
+
+
+class TestConf(unittest.TestCase):
+ """Test the conf subcommand."""
+
+ layer = ConfigLayer
+
+ def setUp(self):
+ self._command = CliRunner()
+
+ def test_cannot_access_nonexistent_section(self):
+ result = self._command.invoke(conf, ('-s', 'thissectiondoesnotexist'))
+ self.assertEqual(result.exit_code, 2)
+ self.assertEqual(
+ result.output,
+ 'Usage: conf [OPTIONS]\n\n'
+ 'Error: No such section: thissectiondoesnotexist\n')
+
+ def test_cannot_access_nonexistent_section_and_key(self):
+ result = self._command.invoke(
+ conf, ('-s', 'thissectiondoesnotexist', '-k', 'nosuchkey'))
+ self.assertEqual(result.exit_code, 2)
+ self.assertEqual(
+ result.output,
+ 'Usage: conf [OPTIONS]\n\n'
+ 'Error: No such section: thissectiondoesnotexist\n')
+
+ def test_cannot_access_nonexistent_key(self):
+ result = self._command.invoke(
+ conf, ('-s', 'mailman', '-k', 'thiskeydoesnotexist'))
+ self.assertEqual(result.exit_code, 2)
+ self.assertEqual(
+ result.output,
+ 'Usage: conf [OPTIONS]\n\n'
+ 'Error: Section mailman: No such key: thiskeydoesnotexist\n')
+
+ def test_output_to_explicit_stdout(self):
+ result = self._command.invoke(
+ conf, ('-o', '-', '-s', 'shell', '-k', 'use_ipython'))
+ self.assertEqual(result.exit_code, 0)
+ self.assertEqual(result.output, 'no\n')
+
+ def test_output_to_file(self):
+ with NamedTemporaryFile() as outfp:
+ result = self._command.invoke(
+ conf, ('-o', outfp.name, '-s', 'shell', '-k', 'use_ipython'))
+ self.assertEqual(result.exit_code, 0)
+ with open(outfp.name, 'r', encoding='utf-8') as infp:
+ self.assertEqual(infp.read(), 'no\n')
diff --git a/src/mailman/commands/tests/test_cli_control.py b/src/mailman/commands/tests/test_cli_control.py
new file mode 100644
index 000000000..642450167
--- /dev/null
+++ b/src/mailman/commands/tests/test_cli_control.py
@@ -0,0 +1,295 @@
+# Copyright (C) 2011-2017 by the Free Software Foundation, Inc.
+#
+# This file is part of GNU Mailman.
+#
+# GNU Mailman is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
+
+"""Test some additional corner cases for starting/stopping."""
+
+import os
+import sys
+import time
+import shutil
+import signal
+import socket
+import unittest
+
+from click.testing import CliRunner
+from contextlib import ExitStack, suppress
+from datetime import datetime, timedelta
+from flufl.lock import SEP
+from mailman.bin.master import WatcherState
+from mailman.commands.cli_control import reopen, restart, start
+from mailman.config import config
+from mailman.testing.helpers import configuration
+from mailman.testing.layers import ConfigLayer
+from pkg_resources import resource_filename
+from public import public
+from tempfile import TemporaryDirectory
+from unittest.mock import patch
+
+
+# For ../docs/control.rst
+@public
+def make_config(resources):
+ cfg_path = resource_filename(
+ 'mailman.commands.tests.data', 'no-runners.cfg')
+ # We have to patch the global config's filename attribute. The problem
+ # here is that click does not support setting the -C option on the
+ # parent command (i.e. `master`).
+ # https://github.com/pallets/click/issues/831
+ resources.enter_context(patch.object(config, 'filename', cfg_path))
+
+
+# For ../docs/control.rst
+@public
+def find_master():
+ # See if the master process is still running.
+ until = timedelta(seconds=10) + datetime.now()
+ while datetime.now() < until:
+ time.sleep(0.1)
+ with suppress(FileNotFoundError, ValueError, ProcessLookupError):
+ with open(config.PID_FILE) as fp:
+ pid = int(fp.read().strip())
+ os.kill(pid, 0)
+ return pid
+ return None
+
+
+@public
+def claim_lock():
+ # Fake an acquisition of the master lock by another process, which
+ # subsequently goes stale. Start by finding a free process id. Yes,
+ # this could race, but given that we're starting with our own PID and
+ # searching downward, it's less likely.
+ fake_pid = os.getpid() - 1
+ while fake_pid > 1:
+ try:
+ os.kill(fake_pid, 0)
+ except ProcessLookupError:
+ break
+ fake_pid -= 1
+ else:
+ raise RuntimeError('Cannot find free PID')
+ # Lock acquisition logic taken from flufl.lock.
+ claim_file = SEP.join((
+ config.LOCK_FILE,
+ socket.getfqdn(),
+ str(fake_pid),
+ '0'))
+ with open(config.LOCK_FILE, 'w') as fp:
+ fp.write(claim_file)
+ os.link(config.LOCK_FILE, claim_file)
+ expiration_date = datetime.now() - timedelta(minutes=5)
+ t = time.mktime(expiration_date.timetuple())
+ os.utime(claim_file, (t, t))
+ return claim_file
+
+
+@public
+def kill_with_extreme_prejudice(pid_or_pidfile=None):
+ # 2016-12-03 barry: We have intermittent hangs during both local and CI
+ # test suite runs where killing a runner or master process doesn't
+ # terminate the process. In those cases, wait()ing on the child can
+ # suspend the test process indefinitely. Locally, you have to C-c the
+ # test process, but that still doesn't kill it; the process continues to
+ # run in the background. If you then search for the process's pid and
+ # SIGTERM it, it will usually exit, which is why I don't understand why
+ # the above SIGTERM doesn't kill it sometimes. However, when run under
+ # CI, the test suite will just hang until the CI runner times it out. It
+ # would be better to figure out the underlying cause, because we have
+ # definitely seen other situations where a runner process won't exit, but
+ # for testing purposes we're just trying to clean up some resources so
+ # after a brief attempt at SIGTERMing it, let's SIGKILL it and warn.
+ if isinstance(pid_or_pidfile, str):
+ try:
+ with open(pid_or_pidfile, 'r') as fp:
+ pid = int(fp.read())
+ except FileNotFoundError:
+ # There's nothing to kill.
+ return
+ else:
+ pid = pid_or_pidfile
+ if pid is not None:
+ os.kill(pid, signal.SIGTERM)
+ until = timedelta(seconds=10) + datetime.now()
+ while datetime.now() < until:
+ try:
+ if pid is None:
+ os.wait3(os.WNOHANG)
+ else:
+ os.waitpid(pid, os.WNOHANG)
+ except ChildProcessError:
+ # This basically means we went one too many times around the
+ # loop. The previous iteration successfully reaped the child.
+ # Because the return status of wait3() and waitpid() are different
+ # in those cases, it's easier just to catch the exception for
+ # either call and exit.
+ return
+ time.sleep(0.1)
+ else:
+ if pid is None:
+ # There's really not much more we can do because we have no pid to
+ # SIGKILL. Just report the problem and continue.
+ print('WARNING: NO CHANGE IN CHILD PROCESS STATES',
+ file=sys.stderr)
+ return
+ print('WARNING: SIGTERM DID NOT EXIT PROCESS; SIGKILLing',
+ file=sys.stderr)
+ if pid is not None:
+ os.kill(pid, signal.SIGKILL)
+ until = timedelta(seconds=10) + datetime.now()
+ while datetime.now() < until:
+ status = os.waitpid(pid, os.WNOHANG)
+ if status == (0, 0):
+ # The child was reaped.
+ return
+ time.sleep(0.1)
+ else:
+ print('WARNING: SIGKILL DID NOT EXIT PROCESS!', file=sys.stderr)
+
+
+class TestControl(unittest.TestCase):
+ layer = ConfigLayer
+ maxDiff = None
+
+ def setUp(self):
+ self._command = CliRunner()
+ self._tmpdir = TemporaryDirectory()
+ self.addCleanup(self._tmpdir.cleanup)
+ # Specify where to put the pid file; and make sure that the master
+ # gets killed regardless of whether it gets started or not.
+ self._pid_file = os.path.join(self._tmpdir.name, 'master-test.pid')
+ self.addCleanup(kill_with_extreme_prejudice, self._pid_file)
+ # Patch cli_control so that 1) it doesn't actually do a fork, since
+ # that makes it impossible to avoid race conditions in the test; 2)
+ # doesn't actually os.execl().
+ with ExitStack() as resources:
+ resources.enter_context(patch(
+ 'mailman.commands.cli_control.os.fork',
+ # Pretend to be the child.
+ return_value=0
+ ))
+ self._execl = resources.enter_context(patch(
+ 'mailman.commands.cli_control.os.execl'))
+ resources.enter_context(patch(
+ 'mailman.commands.cli_control.os.setsid'))
+ resources.enter_context(patch(
+ 'mailman.commands.cli_control.os.chdir'))
+ resources.enter_context(patch(
+ 'mailman.commands.cli_control.os.environ',
+ os.environ.copy()))
+ # Arrange for the mocks to be reverted when the test is over.
+ self.addCleanup(resources.pop_all().close)
+
+ def test_master_is_elsewhere_and_missing(self):
+ with ExitStack() as resources:
+ bin_dir = resources.enter_context(TemporaryDirectory())
+ old_master = os.path.join(config.BIN_DIR, 'master')
+ new_master = os.path.join(bin_dir, 'master')
+ shutil.move(old_master, new_master)
+ resources.callback(shutil.move, new_master, old_master)
+ results = self._command.invoke(start)
+ # Argument #2 to the execl() call should be the path to the master
+ # program, and the path should not exist.
+ self.assertEqual(
+ len(self._execl.call_args_list), 1, results.output)
+ posargs, kws = self._execl.call_args_list[0]
+ master_path = posargs[2]
+ self.assertEqual(os.path.basename(master_path), 'master')
+ self.assertFalse(os.path.exists(master_path), master_path)
+
+ def test_master_is_elsewhere_and_findable(self):
+ with ExitStack() as resources:
+ bin_dir = resources.enter_context(TemporaryDirectory())
+ old_master = os.path.join(config.BIN_DIR, 'master')
+ new_master = os.path.join(bin_dir, 'master')
+ shutil.move(old_master, new_master)
+ resources.callback(shutil.move, new_master, old_master)
+ with configuration('paths.testing', bin_dir=bin_dir):
+ results = self._command.invoke(start)
+ # Argument #2 to the execl() call should be the path to the master
+ # program, and the path should exist.
+ self.assertEqual(
+ len(self._execl.call_args_list), 1, results.output)
+ posargs, kws = self._execl.call_args_list[0]
+ master_path = posargs[2]
+ self.assertEqual(os.path.basename(master_path), 'master')
+ self.assertTrue(os.path.exists(master_path), master_path)
+
+ def test_stale_lock_no_force(self):
+ claim_file = claim_lock()
+ self.addCleanup(os.remove, claim_file)
+ self.addCleanup(os.remove, config.LOCK_FILE)
+ result = self._command.invoke(start)
+ self.assertEqual(result.exit_code, 2)
+ self.assertEqual(
+ result.output,
+ 'Usage: start [OPTIONS]\n\n'
+ 'Error: A previous run of GNU Mailman did not exit cleanly '
+ '(stale_lock). Try using --force\n')
+
+ def test_stale_lock_force(self):
+ claim_file = claim_lock()
+ self.addCleanup(os.remove, claim_file)
+ self.addCleanup(os.remove, config.LOCK_FILE)
+ # Don't test the results of this command. Because we're mocking
+ # os.execl(), we'll end up raising the RuntimeError at the end of the
+ # start() method, child branch.
+ self._command.invoke(start, ('--force',))
+ self.assertEqual(len(self._execl.call_args_list), 1)
+ posargs, kws = self._execl.call_args_list[0]
+ self.assertIn('--force', posargs)
+
+
+class TestControlSimple(unittest.TestCase):
+ layer = ConfigLayer
+ maxDiff = None
+
+ def setUp(self):
+ self._command = CliRunner()
+
+ def test_watcher_state_conflict(self):
+ with patch('mailman.commands.cli_control.master_state',
+ return_value=(WatcherState.conflict, object())):
+ results = self._command.invoke(start)
+ self.assertEqual(results.exit_code, 2)
+ self.assertEqual(
+ results.output,
+ 'Usage: start [OPTIONS]\n\n'
+ 'Error: GNU Mailman is already running\n')
+
+ def test_reopen(self):
+ with patch('mailman.commands.cli_control.kill_watcher') as mock:
+ result = self._command.invoke(reopen)
+ mock.assert_called_once_with(signal.SIGHUP)
+ self.assertEqual(result.output, 'Reopening the Mailman runners\n')
+
+ def test_reopen_quiet(self):
+ with patch('mailman.commands.cli_control.kill_watcher') as mock:
+ result = self._command.invoke(reopen, ('--quiet',))
+ mock.assert_called_once_with(signal.SIGHUP)
+ self.assertEqual(result.output, '')
+
+ def test_restart(self):
+ with patch('mailman.commands.cli_control.kill_watcher') as mock:
+ result = self._command.invoke(restart)
+ mock.assert_called_once_with(signal.SIGUSR1)
+ self.assertEqual(result.output, 'Restarting the Mailman runners\n')
+
+ def test_restart_quiet(self):
+ with patch('mailman.commands.cli_control.kill_watcher') as mock:
+ result = self._command.invoke(restart, ('--quiet',))
+ mock.assert_called_once_with(signal.SIGUSR1)
+ self.assertEqual(result.output, '')
diff --git a/src/mailman/commands/tests/test_cli_create.py b/src/mailman/commands/tests/test_cli_create.py
new file mode 100644
index 000000000..364018d92
--- /dev/null
+++ b/src/mailman/commands/tests/test_cli_create.py
@@ -0,0 +1,116 @@
+# Copyright (C) 2011-2017 by the Free Software Foundation, Inc.
+#
+# This file is part of GNU Mailman.
+#
+# GNU Mailman is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
+
+"""Test the `mailman create` subcommand."""
+
+import unittest
+
+from click.testing import CliRunner
+from mailman.app.lifecycle import create_list
+from mailman.commands.cli_lists import create, remove
+from mailman.interfaces.domain import IDomainManager
+from mailman.testing.layers import ConfigLayer
+from zope.component import getUtility
+
+
+class TestCreate(unittest.TestCase):
+ layer = ConfigLayer
+
+ def setUp(self):
+ self._command = CliRunner()
+
+ def test_cannot_create_duplicate_list(self):
+ # Cannot create a mailing list if it already exists.
+ create_list('ant@example.com')
+ result = self._command.invoke(create, ('ant@example.com',))
+ self.assertEqual(result.exit_code, 2)
+ self.assertEqual(
+ result.output,
+ 'Usage: create [OPTIONS] LISTNAME\n\n'
+ 'Error: List already exists: ant@example.com\n')
+
+ def test_invalid_posting_address(self):
+ # Cannot create a mailing list with an invalid posting address.
+ result = self._command.invoke(create, ('foo',))
+ self.assertEqual(result.exit_code, 2)
+ self.assertEqual(
+ result.output,
+ 'Usage: create [OPTIONS] LISTNAME\n\n'
+ 'Error: Illegal list name: foo\n')
+
+ def test_invalid_owner_addresses(self):
+ # Cannot create a list with invalid owner addresses. LP: #778687
+ result = self._command.invoke(
+ create, ('-o', 'invalid', 'ant@example.com'))
+ self.assertEqual(result.exit_code, 2)
+ self.assertEqual(
+ result.output,
+ 'Usage: create [OPTIONS] LISTNAME\n\n'
+ 'Error: Illegal owner addresses: invalid\n')
+
+ def test_create_without_domain_option(self):
+ # The domain will be created if no domain options are specified. Use
+ # the example.org domain since example.com is created by the test
+ # suite so it would always already exist.
+ result = self._command.invoke(create, ('ant@example.org',))
+ self.assertEqual(result.exit_code, 0)
+ domain = getUtility(IDomainManager)['example.org']
+ self.assertEqual(domain.mail_host, 'example.org')
+
+ def test_create_with_d(self):
+ result = self._command.invoke(create, ('ant@example.org', '-d'))
+ self.assertEqual(result.exit_code, 0)
+ domain = getUtility(IDomainManager)['example.org']
+ self.assertEqual(domain.mail_host, 'example.org')
+
+ def test_create_with_domain(self):
+ result = self._command.invoke(create, ('ant@example.org', '--domain'))
+ self.assertEqual(result.exit_code, 0)
+ domain = getUtility(IDomainManager)['example.org']
+ self.assertEqual(domain.mail_host, 'example.org')
+
+ def test_create_with_D(self):
+ result = self._command.invoke(create, ('ant@example.org', '-D'))
+ self.assertEqual(result.exit_code, 2)
+ self.assertEqual(
+ result.output,
+ 'Usage: create [OPTIONS] LISTNAME\n\n'
+ 'Error: Undefined domain: example.org\n')
+
+ def test_create_with_nodomain(self):
+ result = self._command.invoke(
+ create, ('ant@example.org', '--no-domain'))
+ self.assertEqual(result.exit_code, 2)
+ self.assertEqual(
+ result.output,
+ 'Usage: create [OPTIONS] LISTNAME\n\n'
+ 'Error: Undefined domain: example.org\n')
+
+
+class TestRemove(unittest.TestCase):
+ layer = ConfigLayer
+
+ def setUp(self):
+ self._command = CliRunner()
+
+ def test_remove_not_quiet_no_such_list(self):
+ results = self._command.invoke(remove, ('ant@example.com',))
+ # It's not an error to try to remove a nonexistent list.
+ self.assertEqual(results.exit_code, 0)
+ self.assertEqual(
+ results.output,
+ 'No such list matching spec: ant@example.com\n')
diff --git a/src/mailman/commands/tests/test_digests.py b/src/mailman/commands/tests/test_cli_digests.py
index 0a3ea5226..3c033d60d 100644
--- a/src/mailman/commands/tests/test_digests.py
+++ b/src/mailman/commands/tests/test_cli_digests.py
@@ -20,10 +20,10 @@
import os
import unittest
+from click.testing import CliRunner
from datetime import timedelta
-from io import StringIO
from mailman.app.lifecycle import create_list
-from mailman.commands.cli_digests import Digests
+from mailman.commands.cli_digests import digests
from mailman.config import config
from mailman.interfaces.digests import DigestFrequency
from mailman.interfaces.member import DeliveryMode
@@ -33,16 +33,6 @@ from mailman.testing.helpers import (
specialized_message_from_string as mfs, subscribe)
from mailman.testing.layers import ConfigLayer
from mailman.utilities.datetime import now as right_now
-from unittest.mock import patch
-
-
-class FakeArgs:
- def __init__(self):
- self.lists = []
- self.send = False
- self.bump = False
- self.dry_run = False
- self.verbose = False
class TestSendDigests(unittest.TestCase):
@@ -53,7 +43,7 @@ class TestSendDigests(unittest.TestCase):
self._mlist.digests_enabled = True
self._mlist.digest_size_threshold = 100000
self._mlist.send_welcome_message = False
- self._command = Digests()
+ self._command = CliRunner()
self._handler = config.handlers['to-digest']
self._runner = make_testable_runner(DigestRunner, 'digest')
# The mailing list needs at least one digest recipient.
@@ -76,10 +66,7 @@ Subject: message 1
get_queue_messages('digest', expected_count=0)
mailbox_path = os.path.join(self._mlist.data_path, 'digest.mmdf')
self.assertGreater(os.path.getsize(mailbox_path), 0)
- args = FakeArgs()
- args.send = True
- args.lists.append('ant.example.com')
- self._command.process(args)
+ self._command.invoke(digests, ('-s', '-l', 'ant.example.com'))
self._runner.run()
# Now, there's no digest mbox and there's a plaintext digest in the
# outgoing queue.
@@ -105,10 +92,7 @@ Subject: message 1
get_queue_messages('digest', expected_count=0)
mailbox_path = os.path.join(self._mlist.data_path, 'digest.mmdf')
self.assertGreater(os.path.getsize(mailbox_path), 0)
- args = FakeArgs()
- args.send = True
- args.lists.append('ant@example.com')
- self._command.process(args)
+ self._command.invoke(digests, ('-s', '-l', 'ant@example.com'))
self._runner.run()
# Now, there's no digest mbox and there's a plaintext digest in the
# outgoing queue.
@@ -134,16 +118,12 @@ Subject: message 1
get_queue_messages('digest', expected_count=0)
mailbox_path = os.path.join(self._mlist.data_path, 'digest.mmdf')
self.assertGreater(os.path.getsize(mailbox_path), 0)
- args = FakeArgs()
- args.send = True
- args.lists.append('bee.example.com')
- stderr = StringIO()
- with patch('mailman.commands.cli_digests.sys.stderr', stderr):
- self._command.process(args)
+ result = self._command.invoke(digests, ('-s', '-l', 'bee.example.com'))
+ self.assertEqual(result.exit_code, 0)
+ self.assertEqual(
+ result.output,
+ 'No such list found: bee.example.com\n')
self._runner.run()
- # The warning was printed to stderr.
- self.assertEqual(stderr.getvalue(),
- 'No such list found: bee.example.com\n')
# And no digest was prepared.
self.assertGreater(os.path.getsize(mailbox_path), 0)
get_queue_messages('virgin', expected_count=0)
@@ -164,16 +144,12 @@ Subject: message 1
get_queue_messages('digest', expected_count=0)
mailbox_path = os.path.join(self._mlist.data_path, 'digest.mmdf')
self.assertGreater(os.path.getsize(mailbox_path), 0)
- args = FakeArgs()
- args.send = True
- args.lists.append('bee@example.com')
- stderr = StringIO()
- with patch('mailman.commands.cli_digests.sys.stderr', stderr):
- self._command.process(args)
+ result = self._command.invoke(digests, ('-s', '-l', 'bee@example.com'))
+ self.assertEqual(result.exit_code, 0)
+ self.assertEqual(
+ result.output,
+ 'No such list found: bee@example.com\n')
self._runner.run()
- # The warning was printed to stderr.
- self.assertEqual(stderr.getvalue(),
- 'No such list found: bee@example.com\n')
# And no digest was prepared.
self.assertGreater(os.path.getsize(mailbox_path), 0)
get_queue_messages('virgin', expected_count=0)
@@ -194,16 +170,14 @@ Subject: message 1
get_queue_messages('digest', expected_count=0)
mailbox_path = os.path.join(self._mlist.data_path, 'digest.mmdf')
self.assertGreater(os.path.getsize(mailbox_path), 0)
- args = FakeArgs()
- args.send = True
- args.lists.extend(('ant.example.com', 'bee.example.com'))
- stderr = StringIO()
- with patch('mailman.commands.cli_digests.sys.stderr', stderr):
- self._command.process(args)
+ result = self._command.invoke(
+ digests,
+ ('-s', '-l', 'ant.example.com', '-l', 'bee.example.com'))
+ self.assertEqual(result.exit_code, 0)
+ self.assertEqual(
+ result.output,
+ 'No such list found: bee.example.com\n')
self._runner.run()
- # The warning was printed to stderr.
- self.assertEqual(stderr.getvalue(),
- 'No such list found: bee.example.com\n')
# But ant's digest was still prepared.
self.assertFalse(os.path.exists(mailbox_path))
items = get_queue_messages('virgin', expected_count=1)
@@ -251,10 +225,8 @@ Subject: message 3
# Both.
get_queue_messages('digest', expected_count=0)
# Process both list's digests.
- args = FakeArgs()
- args.send = True
- args.lists.extend(('ant.example.com', 'bee@example.com'))
- self._command.process(args)
+ self._command.invoke(
+ digests, ('-s', '-l', 'ant.example.com', '-l', 'bee@example.com'))
self._runner.run()
# Now, neither list has a digest mbox and but there are plaintext
# digest in the outgoing queue for both.
@@ -318,9 +290,7 @@ Subject: message 3
# Both.
get_queue_messages('digest', expected_count=0)
# Process all mailing list digests by not setting any arguments.
- args = FakeArgs()
- args.send = True
- self._command.process(args)
+ self._command.invoke(digests, ('-s',))
self._runner.run()
# Now, neither list has a digest mbox and but there are plaintext
# digest in the outgoing queue for both.
@@ -349,10 +319,7 @@ Subject: message 3
# can be sent.
mailbox_path = os.path.join(self._mlist.data_path, 'digest.mmdf')
self.assertFalse(os.path.exists(mailbox_path))
- args = FakeArgs()
- args.send = True
- args.lists.append('ant.example.com')
- self._command.process(args)
+ self._command.invoke(digests, ('-s', '-l', 'ant.example.com'))
self._runner.run()
get_queue_messages('virgin', expected_count=0)
@@ -369,11 +336,8 @@ Subject: message 1
""")
self._handler.process(self._mlist, msg, {})
- args = FakeArgs()
- args.bump = True
- args.send = True
- args.lists.append('ant.example.com')
- self._command.process(args)
+ self._command.invoke(
+ digests, ('-s', '--bump', '-l', 'ant.example.com'))
self._runner.run()
# The volume is 8 and the digest number is 2 because a digest was sent
# after the volume/number was bumped.
@@ -393,15 +357,12 @@ class TestBumpVolume(unittest.TestCase):
self._mlist.volume = 7
self._mlist.next_digest_number = 4
self.right_now = right_now()
- self._command = Digests()
+ self._command = CliRunner()
def test_bump_one_list(self):
self._mlist.digest_last_sent_at = self.right_now + timedelta(
days=-32)
- args = FakeArgs()
- args.bump = True
- args.lists.append('ant.example.com')
- self._command.process(args)
+ self._command.invoke(digests, ('-b', '-l', 'ant.example.com'))
self.assertEqual(self._mlist.volume, 8)
self.assertEqual(self._mlist.next_digest_number, 1)
self.assertEqual(self._mlist.digest_last_sent_at, self.right_now)
@@ -416,36 +377,23 @@ class TestBumpVolume(unittest.TestCase):
bee.next_digest_number = 4
bee.digest_last_sent_at = self.right_now + timedelta(
days=-32)
- args = FakeArgs()
- args.bump = True
- args.lists.extend(('ant.example.com', 'bee.example.com'))
- self._command.process(args)
+ self._command.invoke(
+ digests, ('-b', '-l', 'ant.example.com', '-l', 'bee.example.com'))
self.assertEqual(self._mlist.volume, 8)
self.assertEqual(self._mlist.next_digest_number, 1)
self.assertEqual(self._mlist.digest_last_sent_at, self.right_now)
def test_bump_verbose(self):
- args = FakeArgs()
- args.bump = True
- args.verbose = True
- args.lists.append('ant.example.com')
- output = StringIO()
- with patch('sys.stdout', output):
- self._command.process(args)
- self.assertMultiLineEqual(output.getvalue(), """\
+ result = self._command.invoke(
+ digests, ('-v', '-b', '-l', 'ant.example.com'))
+ self.assertMultiLineEqual(result.output, """\
ant.example.com is at volume 7, number 4
ant.example.com bumped to volume 7, number 5
""")
def test_send_verbose(self):
- args = FakeArgs()
- args.send = True
- args.verbose = True
- args.dry_run = True
- args.lists.append('ant.example.com')
- output = StringIO()
- with patch('sys.stdout', output):
- self._command.process(args)
- self.assertMultiLineEqual(output.getvalue(), """\
+ result = self._command.invoke(
+ digests, ('-v', '-s', '-n', '-l', 'ant.example.com'))
+ self.assertMultiLineEqual(result.output, """\
ant.example.com sent volume 7, number 4
""")
diff --git a/src/mailman/commands/tests/test_import.py b/src/mailman/commands/tests/test_cli_import.py
index e210889fc..5f97df294 100644
--- a/src/mailman/commands/tests/test_import.py
+++ b/src/mailman/commands/tests/test_cli_import.py
@@ -19,27 +19,23 @@
import unittest
+from click.testing import CliRunner
from mailman.app.lifecycle import create_list
-from mailman.commands.cli_import import Import21
+from mailman.commands.cli_import import import21
from mailman.testing.layers import ConfigLayer
+from mailman.utilities.importer import Import21Error
+from pickle import dump
from pkg_resources import resource_filename
+from tempfile import NamedTemporaryFile
from unittest.mock import patch
-class FakeArgs:
- listname = ['test@example.com']
- pickle_file = [
- resource_filename('mailman.testing', 'config-with-instances.pck'),
- ]
-
-
class TestImport(unittest.TestCase):
layer = ConfigLayer
def setUp(self):
- self.command = Import21()
- self.args = FakeArgs()
- self.mlist = create_list('test@example.com')
+ self._command = CliRunner()
+ self.mlist = create_list('ant@example.com')
@patch('mailman.commands.cli_import.import_config_pck')
def test_process_pickle_with_bounce_info(self, import_config_pck):
@@ -47,8 +43,34 @@ class TestImport(unittest.TestCase):
# _BounceInfo instances. We throw these away when importing to
# Mailman 3, but we have to fake the instance's classes, otherwise
# unpickling the dictionaries will fail.
+ pckfile = resource_filename(
+ 'mailman.testing', 'config-with-instances.pck')
try:
- self.command.process(self.args)
+ self._command.invoke(import21, ('ant.example.com', pckfile))
except ImportError as error:
self.fail('The pickle failed loading: {}'.format(error))
self.assertTrue(import_config_pck.called)
+
+ def test_missing_list_spec(self):
+ result = self._command.invoke(import21)
+ self.assertEqual(result.exit_code, 2, result.output)
+ self.assertEqual(
+ result.output,
+ 'Usage: import21 [OPTIONS] LISTSPEC PICKLE_FILE\n\n'
+ 'Error: Missing argument "listspec".\n')
+
+ def test_pickle_with_nondict(self):
+ with NamedTemporaryFile() as pckfile:
+ with open(pckfile.name, 'wb') as fp:
+ dump(['not', 'a', 'dict'], fp)
+ result = self._command.invoke(
+ import21, ('ant.example.com', pckfile.name))
+ self.assertIn('Ignoring non-dictionary', result.output)
+
+ def test_pickle_with_bad_language(self):
+ pckfile = resource_filename('mailman.testing', 'config.pck')
+ with patch('mailman.utilities.importer.check_language_code',
+ side_effect=Import21Error('Fake bad language code')):
+ result = self._command.invoke(
+ import21, ('ant.example.com', pckfile))
+ self.assertIn('Fake bad language code', result.output)
diff --git a/src/mailman/commands/tests/test_cli_inject.py b/src/mailman/commands/tests/test_cli_inject.py
new file mode 100644
index 000000000..890ce3a54
--- /dev/null
+++ b/src/mailman/commands/tests/test_cli_inject.py
@@ -0,0 +1,66 @@
+# Copyright (C) 2017 by the Free Software Foundation, Inc.
+#
+# This file is part of GNU Mailman.
+#
+# GNU Mailman is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
+
+"""Test the `inject` command."""
+
+import unittest
+
+from click.testing import CliRunner
+from io import StringIO
+from mailman.app.lifecycle import create_list
+from mailman.commands.cli_inject import inject
+from mailman.testing.layers import ConfigLayer
+
+
+class InterruptRaisingReader(StringIO):
+ def read(self, count=None):
+ # Fake enough of the API so click returns this instance unchanged.
+ if count is None:
+ raise KeyboardInterrupt
+ return b''
+
+
+class TestInject(unittest.TestCase):
+ layer = ConfigLayer
+
+ def setUp(self):
+ self._command = CliRunner()
+ create_list('ant@example.com')
+
+ def test_inject_keyboard_interrupt(self):
+ results = self._command.invoke(
+ inject, ('-f', '-', 'ant.example.com'),
+ input=InterruptRaisingReader())
+ self.assertEqual(results.exit_code, 1)
+ self.assertEqual(results.output, 'Interrupted\n')
+
+ def test_inject_no_such_list(self):
+ result = self._command.invoke(inject, ('bee.example.com',))
+ self.assertEqual(result.exit_code, 2)
+ self.assertEqual(
+ result.output,
+ 'Usage: inject [OPTIONS] LISTSPEC\n\n'
+ 'Error: No such list: bee.example.com\n')
+
+ def test_inject_no_such_queue(self):
+ result = self._command.invoke(
+ inject, ('--queue', 'bogus', 'ant.example.com'))
+ self.assertEqual(result.exit_code, 2)
+ self.assertEqual(
+ result.output,
+ 'Usage: inject [OPTIONS] LISTSPEC\n\n'
+ 'Error: No such queue: bogus\n')
diff --git a/src/mailman/commands/tests/test_lists.py b/src/mailman/commands/tests/test_cli_lists.py
index 8463b639f..6b0ff5cf3 100644
--- a/src/mailman/commands/tests/test_lists.py
+++ b/src/mailman/commands/tests/test_cli_lists.py
@@ -19,26 +19,20 @@
import unittest
-from io import StringIO
+from click.testing import CliRunner
from mailman.app.lifecycle import create_list
-from mailman.commands.cli_lists import Lists
+from mailman.commands.cli_lists import lists
from mailman.interfaces.domain import IDomainManager
from mailman.testing.layers import ConfigLayer
-from unittest.mock import patch
from zope.component import getUtility
-class FakeArgs:
- advertised = False
- names = False
- descriptions = False
- quiet = False
- domain = []
-
-
class TestLists(unittest.TestCase):
layer = ConfigLayer
+ def setUp(self):
+ self._command = CliRunner()
+
def test_lists_with_domain_option(self):
# LP: #1166911 - non-matching lists were returned.
getUtility(IDomainManager).add(
@@ -48,14 +42,8 @@ class TestLists(unittest.TestCase):
# Only this one should show up.
create_list('test3@example.net')
create_list('test4@example.com')
- command = Lists()
- args = FakeArgs()
- args.domain.append('example.net')
- output = StringIO()
- with patch('sys.stdout', output):
- command.process(args)
- lines = output.getvalue().splitlines()
- # The first line is the heading, so skip that.
- lines.pop(0)
- self.assertEqual(len(lines), 1, lines)
- self.assertEqual(lines[0], 'test3@example.net')
+ result = self._command.invoke(lists, ('--domain', 'example.net'))
+ self.assertEqual(result.exit_code, 0)
+ self.assertEqual(
+ result.output,
+ '1 matching mailing lists found:\ntest3@example.net\n')
diff --git a/src/mailman/commands/tests/test_members.py b/src/mailman/commands/tests/test_cli_members.py
index 74c7414f6..9c72c61ed 100644
--- a/src/mailman/commands/tests/test_members.py
+++ b/src/mailman/commands/tests/test_cli_members.py
@@ -17,37 +17,15 @@
"""Test the `mailman members` command."""
-import sys
import unittest
-from functools import partial
-from io import StringIO
+from click.testing import CliRunner
from mailman.app.lifecycle import create_list
-from mailman.commands.cli_members import Members
+from mailman.commands.cli_members import members
from mailman.interfaces.member import MemberRole
from mailman.testing.helpers import subscribe
from mailman.testing.layers import ConfigLayer
from tempfile import NamedTemporaryFile
-from unittest.mock import patch
-
-
-class FakeArgs:
- input_filename = None
- output_filename = None
- role = None
- regular = None
- digest = None
- nomail = None
- list = None
-
-
-class FakeParser:
- def __init__(self):
- self.message = None
-
- def error(self, message):
- self.message = message
- sys.exit(1)
class TestCLIMembers(unittest.TestCase):
@@ -55,35 +33,25 @@ class TestCLIMembers(unittest.TestCase):
def setUp(self):
self._mlist = create_list('ant@example.com')
- self.command = Members()
- self.command.parser = FakeParser()
- self.args = FakeArgs()
+ self._command = CliRunner()
def test_no_such_list(self):
- self.args.list = ['bee.example.com']
- with self.assertRaises(SystemExit):
- self.command.process(self.args)
- self.assertEqual(self.command.parser.message,
- 'No such list: bee.example.com')
-
- def test_bad_delivery_status(self):
- self.args.list = ['ant.example.com']
- self.args.nomail = 'bogus'
- with self.assertRaises(SystemExit):
- self.command.process(self.args)
- self.assertEqual(self.command.parser.message,
- 'Unknown delivery status: bogus')
+ result = self._command.invoke(members, ('bee.example.com',))
+ self.assertEqual(result.exit_code, 2)
+ self.assertEqual(
+ result.output,
+ 'Usage: members [OPTIONS] LISTSPEC\n\n'
+ 'Error: No such list: bee.example.com\n')
def test_role_administrator(self):
subscribe(self._mlist, 'Anne', role=MemberRole.owner)
subscribe(self._mlist, 'Bart', role=MemberRole.moderator)
subscribe(self._mlist, 'Cate', role=MemberRole.nonmember)
subscribe(self._mlist, 'Dave', role=MemberRole.member)
- self.args.list = ['ant.example.com']
- self.args.role = 'administrator'
with NamedTemporaryFile('w', encoding='utf-8') as outfp:
- self.args.output_filename = outfp.name
- self.command.process(self.args)
+ self._command.invoke(members, (
+ '--role', 'administrator', '-o', outfp.name,
+ 'ant.example.com'))
with open(outfp.name, 'r', encoding='utf-8') as infp:
lines = infp.readlines()
self.assertEqual(len(lines), 2)
@@ -95,11 +63,9 @@ class TestCLIMembers(unittest.TestCase):
subscribe(self._mlist, 'Bart', role=MemberRole.moderator)
subscribe(self._mlist, 'Cate', role=MemberRole.nonmember)
subscribe(self._mlist, 'Dave', role=MemberRole.member)
- self.args.list = ['ant.example.com']
- self.args.role = 'any'
with NamedTemporaryFile('w', encoding='utf-8') as outfp:
- self.args.output_filename = outfp.name
- self.command.process(self.args)
+ self._command.invoke(members, (
+ '--role', 'any', '-o', outfp.name, 'ant.example.com'))
with open(outfp.name, 'r', encoding='utf-8') as infp:
lines = infp.readlines()
self.assertEqual(len(lines), 4)
@@ -113,34 +79,34 @@ class TestCLIMembers(unittest.TestCase):
subscribe(self._mlist, 'Bart', role=MemberRole.moderator)
subscribe(self._mlist, 'Cate', role=MemberRole.nonmember)
subscribe(self._mlist, 'Dave', role=MemberRole.member)
- self.args.list = ['ant.example.com']
- self.args.role = 'moderator'
with NamedTemporaryFile('w', encoding='utf-8') as outfp:
- self.args.output_filename = outfp.name
- self.command.process(self.args)
+ self._command.invoke(members, (
+ '--role', 'moderator', '-o', outfp.name, 'ant.example.com'))
with open(outfp.name, 'r', encoding='utf-8') as infp:
lines = infp.readlines()
self.assertEqual(len(lines), 1)
self.assertEqual(lines[0], 'Bart Person <bperson@example.com>\n')
- def test_bad_role(self):
- self.args.list = ['ant.example.com']
- self.args.role = 'bogus'
- with self.assertRaises(SystemExit):
- self.command.process(self.args)
- self.assertEqual(self.command.parser.message,
- 'Unknown member role: bogus')
+ def test_role_nonmember(self):
+ subscribe(self._mlist, 'Anne', role=MemberRole.owner)
+ subscribe(self._mlist, 'Bart', role=MemberRole.moderator)
+ subscribe(self._mlist, 'Cate', role=MemberRole.nonmember)
+ subscribe(self._mlist, 'Dave', role=MemberRole.member)
+ with NamedTemporaryFile('w', encoding='utf-8') as outfp:
+ self._command.invoke(members, (
+ '--role', 'nonmember', '-o', outfp.name, 'ant.example.com'))
+ with open(outfp.name, 'r', encoding='utf-8') as infp:
+ lines = infp.readlines()
+ self.assertEqual(len(lines), 1)
+ self.assertEqual(lines[0], 'Cate Person <cperson@example.com>\n')
def test_already_subscribed_with_display_name(self):
subscribe(self._mlist, 'Anne')
- outfp = StringIO()
with NamedTemporaryFile('w', buffering=1, encoding='utf-8') as infp:
print('Anne Person <aperson@example.com>', file=infp)
- self.args.list = ['ant.example.com']
- self.args.input_filename = infp.name
- with patch('builtins.print', partial(print, file=outfp)):
- self.command.process(self.args)
+ result = self._command.invoke(members, (
+ '--add', infp.name, 'ant.example.com'))
self.assertEqual(
- outfp.getvalue(),
+ result.output,
'Already subscribed (skipping): Anne Person <aperson@example.com>\n'
)
diff --git a/src/mailman/commands/tests/test_cli_qfile.py b/src/mailman/commands/tests/test_cli_qfile.py
new file mode 100644
index 000000000..b907abcc6
--- /dev/null
+++ b/src/mailman/commands/tests/test_cli_qfile.py
@@ -0,0 +1,59 @@
+# Copyright (C) 2017 by the Free Software Foundation, Inc.
+#
+# This file is part of GNU Mailman.
+#
+# GNU Mailman is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
+
+"""Test the qfile command."""
+
+import unittest
+
+from click.testing import CliRunner
+from contextlib import ExitStack
+from mailman.commands.cli_qfile import qfile
+from mailman.testing.layers import ConfigLayer
+from pickle import dump
+from tempfile import NamedTemporaryFile
+from unittest.mock import patch
+
+
+class TestUnshunt(unittest.TestCase):
+ layer = ConfigLayer
+ maxDiff = None
+
+ def setUp(self):
+ self._command = CliRunner()
+
+ def test_print_str(self):
+ with NamedTemporaryFile() as tmp_qfile:
+ with open(tmp_qfile.name, 'wb') as fp:
+ dump('a simple string', fp)
+ results = self._command.invoke(qfile, (tmp_qfile.name,))
+ self.assertEqual(results.output, """\
+[----- start pickle -----]
+<----- start object 1 ----->
+a simple string
+[----- end pickle -----]
+""", results.output)
+
+ def test_interactive(self):
+ with ExitStack() as resources:
+ tmp_qfile = resources.enter_context(NamedTemporaryFile())
+ mock = resources.enter_context(patch(
+ 'mailman.commands.cli_qfile.interact'))
+ with open(tmp_qfile.name, 'wb') as fp:
+ dump('a simple string', fp)
+ self._command.invoke(qfile, (tmp_qfile.name, '-i'))
+ mock.assert_called_once_with(
+ banner="Number of objects found (see the variable 'm'): 1")
diff --git a/src/mailman/commands/tests/test_cli_shell.py b/src/mailman/commands/tests/test_cli_shell.py
new file mode 100644
index 000000000..ffcf43803
--- /dev/null
+++ b/src/mailman/commands/tests/test_cli_shell.py
@@ -0,0 +1,173 @@
+# Copyright (C) 2016-2017 by the Free Software Foundation, Inc.
+#
+# This file is part of GNU Mailman.
+#
+# GNU Mailman is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
+
+"""Test the withlist/shell command."""
+
+import os
+import unittest
+
+from click.testing import CliRunner
+from contextlib import ExitStack
+from mailman.app.lifecycle import create_list
+from mailman.commands.cli_withlist import shell
+from mailman.config import config
+from mailman.interfaces.usermanager import IUserManager
+from mailman.testing.helpers import configuration
+from mailman.testing.layers import ConfigLayer
+from mailman.utilities.modules import hacked_sys_modules
+from unittest.mock import MagicMock, patch
+
+try:
+ import readline # noqa: F401
+ has_readline = True
+except ImportError:
+ has_readline = False
+
+
+class TestShell(unittest.TestCase):
+ layer = ConfigLayer
+
+ def setUp(self):
+ self._command = CliRunner()
+
+ def test_namespace(self):
+ with patch('mailman.commands.cli_withlist.start_python') as mock:
+ self._command.invoke(shell, ('--interactive',))
+ self.assertEqual(mock.call_count, 1)
+ # Don't test that all names are available, just a few choice ones.
+ positional, keywords = mock.call_args
+ namespace = positional[0]
+ self.assertIn('getUtility', namespace)
+ self.assertIn('IArchiver', namespace)
+ self.assertEqual(namespace['IUserManager'], IUserManager)
+
+ @configuration('shell', banner='my banner')
+ def test_banner(self):
+ with patch('mailman.commands.cli_withlist.interact') as mock:
+ self._command.invoke(shell, ('--interactive',))
+ self.assertEqual(mock.call_count, 1)
+ positional, keywords = mock.call_args
+ self.assertEqual(keywords['banner'], 'my banner\n')
+
+ @unittest.skipUnless(has_readline, 'readline module is not available')
+ @configuration('shell', history_file='$var_dir/history.py')
+ def test_history_file(self):
+ with patch('mailman.commands.cli_withlist.interact'):
+ self._command.invoke(shell, ('--interactive',))
+ history_file = os.path.join(config.VAR_DIR, 'history.py')
+ self.assertTrue(os.path.exists(history_file))
+
+ @configuration('shell', use_ipython='yes')
+ def test_start_ipython4(self):
+ mock = MagicMock()
+ with hacked_sys_modules('IPython.terminal.embed', mock):
+ self._command.invoke(shell, ('--interactive',))
+ posargs, kws = mock.InteractiveShellEmbed.instance().mainloop.call_args
+ self.assertEqual(
+ kws['display_banner'], 'Welcome to the GNU Mailman shell\n')
+
+ @configuration('shell', use_ipython='yes')
+ def test_start_ipython1(self):
+ mock = MagicMock()
+ with hacked_sys_modules('IPython.frontend.terminal.embed', mock):
+ self._command.invoke(shell, ('--interactive',))
+ posargs, kws = mock.InteractiveShellEmbed.instance.call_args
+ self.assertEqual(
+ kws['banner1'], 'Welcome to the GNU Mailman shell\n')
+
+ @configuration('shell', use_ipython='debug')
+ def test_start_ipython_debug(self):
+ mock = MagicMock()
+ with hacked_sys_modules('IPython.terminal.embed', mock):
+ self._command.invoke(shell, ('--interactive',))
+ posargs, kws = mock.InteractiveShellEmbed.instance().mainloop.call_args
+ self.assertEqual(
+ kws['display_banner'], 'Welcome to the GNU Mailman shell\n')
+
+ @configuration('shell', use_ipython='oops')
+ def test_start_ipython_invalid(self):
+ mock = MagicMock()
+ with hacked_sys_modules('IPython.terminal.embed', mock):
+ results = self._command.invoke(shell, ('--interactive',))
+ self.assertEqual(
+ results.output,
+ 'Invalid value for [shell]use_python: oops\n')
+ # mainloop() never got called.
+ self.assertIsNone(
+ mock.InteractiveShellEmbed.instance().mainloop.call_args)
+
+ @configuration('shell', use_ipython='yes')
+ def test_start_ipython_uninstalled(self):
+ with ExitStack() as resources:
+ # Pretend iPython isn't available at all.
+ resources.enter_context(patch(
+ 'mailman.commands.cli_withlist.start_ipython1',
+ return_value=None))
+ resources.enter_context(patch(
+ 'mailman.commands.cli_withlist.start_ipython4',
+ return_value=None))
+ results = self._command.invoke(shell, ('--interactive',))
+ self.assertEqual(
+ results.output,
+ 'ipython is not available, set use_ipython to no\n')
+
+ def test_regex_without_run(self):
+ results = self._command.invoke(shell, ('-l', '^.*example.com'))
+ self.assertEqual(results.exit_code, 2)
+ self.assertEqual(
+ results.output,
+ 'Usage: shell [OPTIONS] [RUN_ARGS]...\n\n'
+ 'Error: Regular expression requires --run\n')
+
+ def test_listspec_without_run(self):
+ create_list('ant@example.com')
+ mock = MagicMock()
+ with ExitStack() as resources:
+ resources.enter_context(
+ hacked_sys_modules('IPython.terminal.embed', mock))
+ interactive_mock = resources.enter_context(patch(
+ 'mailman.commands.cli_withlist.do_interactive'))
+ self._command.invoke(shell, ('-l', 'ant.example.com'))
+ posargs, kws = interactive_mock.call_args
+ self.assertEqual(
+ posargs[1],
+ "The variable 'm' is the ant.example.com mailing list")
+
+ def test_listspec_without_run_no_such_list(self):
+ results = self._command.invoke(shell, ('-l', 'ant.example.com'))
+ self.assertEqual(results.exit_code, 2)
+ self.assertEqual(
+ results.output,
+ 'Usage: shell [OPTIONS] [RUN_ARGS]...\n\n'
+ 'Error: No such list: ant.example.com\n')
+
+ def test_run_without_listspec(self):
+ results = self._command.invoke(shell, ('--run', 'something'))
+ self.assertEqual(results.exit_code, 2)
+ self.assertEqual(
+ results.output,
+ 'Usage: shell [OPTIONS] [RUN_ARGS]...\n\n'
+ 'Error: --run requires a mailing list\n')
+
+ def test_run_bogus_listspec(self):
+ results = self._command.invoke(
+ shell, ('-l', 'bee.example.com', '--run', 'something'))
+ self.assertEqual(results.exit_code, 2)
+ self.assertEqual(
+ results.output,
+ 'Usage: shell [OPTIONS] [RUN_ARGS]...\n\n'
+ 'Error: No such list: bee.example.com\n')
diff --git a/src/mailman/commands/tests/test_cli_status.py b/src/mailman/commands/tests/test_cli_status.py
new file mode 100644
index 000000000..4e9e3080e
--- /dev/null
+++ b/src/mailman/commands/tests/test_cli_status.py
@@ -0,0 +1,64 @@
+# Copyright (C) 2017 by the Free Software Foundation, Inc.
+#
+# This file is part of GNU Mailman.
+#
+# GNU Mailman is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
+
+"""Test the status command."""
+
+import socket
+import unittest
+
+from click.testing import CliRunner
+from mailman.bin.master import WatcherState
+from mailman.commands.cli_status import status
+from mailman.testing.layers import ConfigLayer
+from unittest.mock import patch
+
+
+class FakeLock:
+ details = ('localhost', 9999, None)
+
+
+class TestStatus(unittest.TestCase):
+ layer = ConfigLayer
+ maxDiff = None
+
+ def setUp(self):
+ self._command = CliRunner()
+
+ def test_stale_lock(self):
+ with patch('mailman.commands.cli_status.master_state',
+ return_value=(WatcherState.stale_lock, FakeLock())):
+ results = self._command.invoke(status)
+ self.assertEqual(results.exit_code,
+ WatcherState.stale_lock.value,
+ results.output)
+ self.assertEqual(
+ results.output,
+ 'GNU Mailman is stopped (stale pid: 9999)\n',
+ results.output)
+
+ def test_unknown_state(self):
+ with patch('mailman.commands.cli_status.master_state',
+ return_value=(WatcherState.host_mismatch, FakeLock())):
+ results = self._command.invoke(status)
+ self.assertEqual(results.exit_code,
+ WatcherState.host_mismatch.value,
+ results.output)
+ self.assertEqual(
+ results.output,
+ 'GNU Mailman is in an unexpected state '
+ '(localhost != {})\n'.format(socket.getfqdn()),
+ results.output)
diff --git a/src/mailman/commands/tests/test_cli_unshunt.py b/src/mailman/commands/tests/test_cli_unshunt.py
new file mode 100644
index 000000000..70303e1b4
--- /dev/null
+++ b/src/mailman/commands/tests/test_cli_unshunt.py
@@ -0,0 +1,45 @@
+# Copyright (C) 2017 by the Free Software Foundation, Inc.
+#
+# This file is part of GNU Mailman.
+#
+# GNU Mailman is free software: you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
+
+"""Test the `unshunt` command."""
+
+import unittest
+
+from click.testing import CliRunner
+from mailman.commands.cli_unshunt import unshunt
+from mailman.config import config
+from mailman.email.message import Message
+from mailman.testing.layers import ConfigLayer
+from unittest.mock import patch
+
+
+class TestUnshunt(unittest.TestCase):
+ layer = ConfigLayer
+ maxDiff = None
+
+ def setUp(self):
+ self._command = CliRunner()
+ self._queue = config.switchboards['shunt']
+
+ def test_dequeue_fails(self):
+ filebase = self._queue.enqueue(Message(), {})
+ with patch.object(self._queue, 'dequeue',
+ side_effect=RuntimeError('oops!')):
+ results = self._command.invoke(unshunt)
+ self.assertEqual(
+ results.output,
+ 'Cannot unshunt message {}, skipping:\noops!\n'.format(filebase))
diff --git a/src/mailman/commands/tests/test_conf.py b/src/mailman/commands/tests/test_conf.py
deleted file mode 100644
index 94ca0a979..000000000
--- a/src/mailman/commands/tests/test_conf.py
+++ /dev/null
@@ -1,108 +0,0 @@
-# Copyright (C) 2013-2017 by the Free Software Foundation, Inc.
-#
-# This file is part of GNU Mailman.
-#
-# GNU Mailman is free software: you can redistribute it and/or modify it under
-# the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 3 of the License, or (at your option)
-# any later version.
-#
-# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
-# more details.
-#
-# You should have received a copy of the GNU General Public License along with
-# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
-
-"""Test the conf subcommand."""
-
-import os
-import sys
-import tempfile
-import unittest
-
-from io import StringIO
-from mailman.commands.cli_conf import Conf
-from mailman.testing.layers import ConfigLayer
-from unittest import mock
-
-
-class FakeArgs:
- section = None
- key = None
- output = None
- sort = False
-
-
-class FakeParser:
- def __init__(self):
- self.message = None
-
- def error(self, message):
- self.message = message
- sys.exit(1)
-
-
-class TestConf(unittest.TestCase):
- """Test the conf subcommand."""
-
- layer = ConfigLayer
-
- def setUp(self):
- self.command = Conf()
- self.command.parser = FakeParser()
- self.args = FakeArgs()
-
- def test_cannot_access_nonexistent_section(self):
- self.args.section = 'thissectiondoesnotexist'
- self.args.key = None
- with self.assertRaises(SystemExit):
- self.command.process(self.args)
- self.assertEqual(self.command.parser.message,
- 'No such section: thissectiondoesnotexist')
-
- def test_cannot_access_nonexistent_key(self):
- self.args.section = "mailman"
- self.args.key = 'thiskeydoesnotexist'
- with self.assertRaises(SystemExit):
- self.command.process(self.args)
- self.assertEqual(self.command.parser.message,
- 'Section mailman: No such key: thiskeydoesnotexist')
-
- def test_output_to_explicit_stdout(self):
- self.args.output = '-'
- self.args.section = 'shell'
- self.args.key = 'use_ipython'
- with mock.patch('sys.stdout') as mock_object:
- self.command.process(self.args)
- mock_object.write.assert_has_calls(
- [mock.call('no'), mock.call('\n')])
-
- def test_output_to_file(self):
- self.args.section = 'shell'
- self.args.key = 'use_ipython'
- fd, filename = tempfile.mkstemp()
- try:
- self.args.output = filename
- self.command.process(self.args)
- with open(filename, 'r') as fp:
- contents = fp.read()
- finally:
- os.remove(filename)
- self.assertEqual(contents, 'no\n')
-
- def test_sort_by_section(self):
- self.args.output = '-'
- self.args.sort = True
- output = StringIO()
- with mock.patch('sys.stdout', output):
- self.command.process(self.args)
- last_line = ''
- for line in output.getvalue().splitlines():
- if not line.startswith('['):
- # This is a continuation line. --sort doesn't sort these.
- continue
- self.assertTrue(line > last_line,
- '{} !> {}'.format(line, last_line))
- last_line = line
diff --git a/src/mailman/commands/tests/test_control.py b/src/mailman/commands/tests/test_control.py
deleted file mode 100644
index 473e78363..000000000
--- a/src/mailman/commands/tests/test_control.py
+++ /dev/null
@@ -1,244 +0,0 @@
-# Copyright (C) 2011-2017 by the Free Software Foundation, Inc.
-#
-# This file is part of GNU Mailman.
-#
-# GNU Mailman is free software: you can redistribute it and/or modify it under
-# the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 3 of the License, or (at your option)
-# any later version.
-#
-# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
-# more details.
-#
-# You should have received a copy of the GNU General Public License along with
-# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
-
-"""Test some additional corner cases for starting/stopping."""
-
-import os
-import sys
-import time
-import errno
-import shutil
-import signal
-import socket
-import unittest
-
-from contextlib import ExitStack, suppress
-from datetime import datetime, timedelta
-from mailman.commands.cli_control import Start, kill_watcher
-from mailman.config import Configuration, config
-from mailman.testing.helpers import configuration
-from mailman.testing.layers import ConfigLayer
-from tempfile import TemporaryDirectory
-
-
-SEP = '|'
-
-
-def make_config():
- # All we care about is the master process; normally it starts a bunch of
- # runners, but we don't care about any of them, so write a test
- # configuration file for the master that disables all the runners.
- new_config = 'no-runners.cfg'
- config_file = os.path.join(os.path.dirname(config.filename), new_config)
- shutil.copyfile(config.filename, config_file)
- with open(config_file, 'a') as fp:
- for runner_config in config.runner_configs:
- print('[{}]\nstart:no\n'.format(runner_config.name), file=fp)
- return config_file
-
-
-def find_master():
- # See if the master process is still running.
- until = timedelta(seconds=10) + datetime.now()
- while datetime.now() < until:
- time.sleep(0.1)
- with suppress(FileNotFoundError, ValueError, ProcessLookupError):
- with open(config.PID_FILE) as fp:
- pid = int(fp.read().strip())
- os.kill(pid, 0)
- return pid
- return None
-
-
-def kill_with_extreme_prejudice(pid=None):
- # 2016-12-03 barry: We have intermittent hangs during both local and CI
- # test suite runs where killing a runner or master process doesn't
- # terminate the process. In those cases, wait()ing on the child can
- # suspend the test process indefinitely. Locally, you have to C-c the
- # test process, but that still doesn't kill it; the process continues to
- # run in the background. If you then search for the process's pid and
- # SIGTERM it, it will usually exit, which is why I don't understand why
- # the above SIGTERM doesn't kill it sometimes. However, when run under
- # CI, the test suite will just hang until the CI runner times it out. It
- # would be better to figure out the underlying cause, because we have
- # definitely seen other situations where a runner process won't exit, but
- # for testing purposes we're just trying to clean up some resources so
- # after a brief attempt at SIGTERMing it, let's SIGKILL it and warn.
- if pid is not None:
- os.kill(pid, signal.SIGTERM)
- until = timedelta(seconds=10) + datetime.now()
- while datetime.now() < until:
- try:
- if pid is None:
- os.wait3(os.WNOHANG)
- else:
- os.waitpid(pid, os.WNOHANG)
- except ChildProcessError:
- # This basically means we went one too many times around the
- # loop. The previous iteration successfully reaped the child.
- # Because the return status of wait3() and waitpid() are different
- # in those cases, it's easier just to catch the exception for
- # either call and exit.
- return
- time.sleep(0.1)
- else:
- if pid is None:
- # There's really not much more we can do because we have no pid to
- # SIGKILL. Just report the problem and continue.
- print('WARNING: NO CHANGE IN CHILD PROCESS STATES',
- file=sys.stderr)
- return
- print('WARNING: SIGTERM DID NOT EXIT PROCESS; SIGKILLing',
- file=sys.stderr)
- if pid is not None:
- os.kill(pid, signal.SIGKILL)
- until = timedelta(seconds=10) + datetime.now()
- while datetime.now() < until:
- status = os.waitpid(pid, os.WNOHANG)
- if status == (0, 0):
- # The child was reaped.
- return
- time.sleep(0.1)
- else:
- print('WARNING: SIGKILL DID NOT EXIT PROCESS!', file=sys.stderr)
-
-
-class FakeArgs:
- force = None
- run_as_user = None
- quiet = True
- config = None
-
-
-class FakeParser:
- def __init__(self):
- self.message = None
-
- def error(self, message):
- self.message = message
- sys.exit(1)
-
-
-class TestStart(unittest.TestCase):
- """Test various starting scenarios."""
-
- layer = ConfigLayer
-
- def setUp(self):
- self.command = Start()
- self.command.parser = FakeParser()
- self.args = FakeArgs()
- self.args.config = make_config()
-
- def tearDown(self):
- try:
- with open(config.PID_FILE) as fp:
- master_pid = int(fp.read())
- except OSError as error:
- if error.errno != errno.ENOENT:
- raise
- # There is no master, so just ignore this.
- return
- kill_watcher(signal.SIGTERM)
- os.waitpid(master_pid, 0)
-
- def test_force_stale_lock(self):
- # Fake an acquisition of the master lock by another process, which
- # subsequently goes stale. Start by finding a free process id. Yes,
- # this could race, but given that we're starting with our own PID and
- # searching downward, it's less likely.
- fake_pid = os.getpid() - 1
- while fake_pid > 1:
- try:
- os.kill(fake_pid, 0)
- except OSError as error:
- if error.errno == errno.ESRCH:
- break
- fake_pid -= 1
- else:
- raise RuntimeError('Cannot find free PID')
- # Lock acquisition logic taken from flufl.lock.
- claim_file = SEP.join((
- config.LOCK_FILE,
- socket.getfqdn(),
- str(fake_pid),
- '0'))
- with open(config.LOCK_FILE, 'w') as fp:
- fp.write(claim_file)
- os.link(config.LOCK_FILE, claim_file)
- expiration_date = datetime.now() - timedelta(minutes=60)
- t = time.mktime(expiration_date.timetuple())
- os.utime(claim_file, (t, t))
- # Start without --force; no master will be running.
- with suppress(SystemExit):
- self.command.process(self.args)
- self.assertIsNone(find_master())
- self.assertIn('--force', self.command.parser.message)
- # Start again, this time with --force.
- self.args.force = True
- self.command.process(self.args)
- pid = find_master()
- self.assertIsNotNone(pid)
-
-
-class TestBinDir(unittest.TestCase):
- """Test issues related to bin_dir, e.g. issue #3"""
-
- layer = ConfigLayer
-
- def setUp(self):
- self.command = Start()
- self.command.parser = FakeParser()
- self.args = FakeArgs()
- self.args.config = make_config()
-
- def test_master_is_elsewhere(self):
- with ExitStack() as resources:
- # Patch os.fork() so that we can record the failing child process's
- # id. We need to wait on the child exiting in either case, and
- # when it fails, no master.pid will be written.
- bin_dir = resources.enter_context(TemporaryDirectory())
- old_master = os.path.join(config.BIN_DIR, 'master')
- new_master = os.path.join(bin_dir, 'master')
- shutil.move(old_master, new_master)
- resources.callback(shutil.move, new_master, old_master)
- # Starting mailman should fail because 'master' can't be found.
- # XXX This will print Errno 2 on the console because we're not
- # silencing the child process's stderr.
- self.command.process(self.args)
- # There should be no pid file.
- args_config = Configuration()
- args_config.load(self.args.config)
- self.assertFalse(os.path.exists(args_config.PID_FILE))
- kill_with_extreme_prejudice()
-
- def test_master_is_elsewhere_and_findable(self):
- with ExitStack() as resources:
- bin_dir = resources.enter_context(TemporaryDirectory())
- old_master = os.path.join(config.BIN_DIR, 'master')
- new_master = os.path.join(bin_dir, 'master')
- shutil.move(old_master, new_master)
- resources.enter_context(
- configuration('paths.testing', bin_dir=bin_dir))
- resources.callback(shutil.move, new_master, old_master)
- # Starting mailman should find master in the new bin_dir.
- self.command.process(self.args)
- # There should a pid file and the process it describes should be
- # killable. We might have to wait until the process has started.
- master_pid = find_master()
- self.assertIsNotNone(master_pid, 'master did not start')
- kill_with_extreme_prejudice(master_pid)
diff --git a/src/mailman/commands/tests/test_create.py b/src/mailman/commands/tests/test_create.py
deleted file mode 100644
index f2232eb6f..000000000
--- a/src/mailman/commands/tests/test_create.py
+++ /dev/null
@@ -1,110 +0,0 @@
-# Copyright (C) 2011-2017 by the Free Software Foundation, Inc.
-#
-# This file is part of GNU Mailman.
-#
-# GNU Mailman is free software: you can redistribute it and/or modify it under
-# the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 3 of the License, or (at your option)
-# any later version.
-#
-# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
-# more details.
-#
-# You should have received a copy of the GNU General Public License along with
-# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
-
-"""Test the `mailman create` subcommand."""
-
-import sys
-import unittest
-
-from argparse import ArgumentParser
-from contextlib import suppress
-from mailman.app.lifecycle import create_list
-from mailman.commands.cli_lists import Create
-from mailman.testing.layers import ConfigLayer
-
-
-class FakeArgs:
- language = None
- owners = []
- quiet = False
- domain = None
- listname = None
- notify = False
-
-
-class FakeParser:
- def __init__(self):
- self.message = None
-
- def error(self, message):
- self.message = message
- sys.exit(1)
-
-
-class TestCreate(unittest.TestCase):
- layer = ConfigLayer
-
- def setUp(self):
- self.command = Create()
- self.command.parser = FakeParser()
- self.args = FakeArgs()
-
- def test_cannot_create_duplicate_list(self):
- # Cannot create a mailing list if it already exists.
- create_list('test@example.com')
- self.args.listname = ['test@example.com']
- with suppress(SystemExit):
- self.command.process(self.args)
- self.assertEqual(self.command.parser.message,
- 'List already exists: test@example.com')
-
- def test_invalid_posting_address(self):
- # Cannot create a mailing list with an invalid posting address.
- self.args.listname = ['foo']
- with suppress(SystemExit):
- self.command.process(self.args)
- self.assertEqual(self.command.parser.message,
- 'Illegal list name: foo')
-
- def test_invalid_owner_addresses(self):
- # Cannot create a list with invalid owner addresses. LP: #778687
- self.args.listname = ['test@example.com']
- self.args.owners = ['main=True']
- with suppress(SystemExit):
- self.command.process(self.args)
- self.assertEqual(self.command.parser.message,
- 'Illegal owner addresses: main=True')
-
- def test_without_domain_option(self):
- # The domain will be created if no domain options are specified.
- parser = ArgumentParser()
- self.command.add(FakeParser(), parser)
- args = parser.parse_args('test@example.org'.split())
- self.assertTrue(args.domain)
-
- def test_with_domain_option(self):
- # The domain will be created if -d is given explicitly.
- parser = ArgumentParser()
- self.command.add(FakeParser(), parser)
- args = parser.parse_args('-d test@example.org'.split())
- self.assertTrue(args.domain)
-
- def test_with_nodomain_option(self):
- # The domain will not be created if --no-domain is given.
- parser = ArgumentParser()
- self.command.add(FakeParser(), parser)
- args = parser.parse_args('-D test@example.net'.split())
- self.assertFalse(args.domain)
-
- def test_error_when_not_creating_domain(self):
- self.args.domain = False
- self.args.listname = ['test@example.org']
- with self.assertRaises(SystemExit) as cm:
- self.command.process(self.args)
- self.assertEqual(cm.exception.code, 1)
- self.assertEqual(self.command.parser.message,
- 'Undefined domain: example.org')
diff --git a/src/mailman/commands/tests/test_confirm.py b/src/mailman/commands/tests/test_eml_confirm.py
index 1ef392b76..1ef392b76 100644
--- a/src/mailman/commands/tests/test_confirm.py
+++ b/src/mailman/commands/tests/test_eml_confirm.py
diff --git a/src/mailman/commands/tests/test_help.py b/src/mailman/commands/tests/test_eml_help.py
index 3090de9cc..3090de9cc 100644
--- a/src/mailman/commands/tests/test_help.py
+++ b/src/mailman/commands/tests/test_eml_help.py
diff --git a/src/mailman/commands/tests/test_membership.py b/src/mailman/commands/tests/test_eml_membership.py
index 6cf4802c6..6cf4802c6 100644
--- a/src/mailman/commands/tests/test_membership.py
+++ b/src/mailman/commands/tests/test_eml_membership.py
diff --git a/src/mailman/commands/tests/test_shell.py b/src/mailman/commands/tests/test_shell.py
deleted file mode 100644
index 2f7998f9f..000000000
--- a/src/mailman/commands/tests/test_shell.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# Copyright (C) 2016-2017 by the Free Software Foundation, Inc.
-#
-# This file is part of GNU Mailman.
-#
-# GNU Mailman is free software: you can redistribute it and/or modify it under
-# the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 3 of the License, or (at your option)
-# any later version.
-#
-# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
-# more details.
-#
-# You should have received a copy of the GNU General Public License along with
-# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
-
-"""Test the withlist/shell command."""
-
-import os
-import unittest
-
-from mailman.commands.cli_withlist import Withlist
-from mailman.config import config
-from mailman.interfaces.usermanager import IUserManager
-from mailman.testing.helpers import configuration
-from mailman.testing.layers import ConfigLayer
-from unittest.mock import patch
-
-try:
- import readline # noqa: F401
- has_readline = True
-except ImportError:
- has_readline = False
-
-
-class FakeArgs:
- interactive = None
- run = None
- details = False
- listname = None
-
-
-class TestShell(unittest.TestCase):
- layer = ConfigLayer
-
- def setUp(self):
- self._shell = Withlist()
-
- def test_namespace(self):
- args = FakeArgs()
- args.interactive = True
- with patch.object(self._shell, '_start_python') as mock:
- self._shell.process(args)
- self.assertEqual(mock.call_count, 1)
- # Don't test that all names are available, just a few choice ones.
- positional, keywords = mock.call_args
- namespace = positional[0]
- self.assertIn('getUtility', namespace)
- self.assertIn('IArchiver', namespace)
- self.assertEqual(namespace['IUserManager'], IUserManager)
-
- @configuration('shell', banner='my banner')
- def test_banner(self):
- args = FakeArgs()
- args.interactive = True
- with patch('mailman.commands.cli_withlist.interact') as mock:
- self._shell.process(args)
- self.assertEqual(mock.call_count, 1)
- positional, keywords = mock.call_args
- self.assertEqual(keywords['banner'], 'my banner\n')
-
- @unittest.skipUnless(has_readline, 'readline module is not available')
- @configuration('shell', history_file='$var_dir/history.py')
- def test_history_file(self):
- args = FakeArgs()
- args.interactive = True
- with patch('mailman.commands.cli_withlist.interact'):
- self._shell.process(args)
- history_file = os.path.join(config.VAR_DIR, 'history.py')
- self.assertTrue(os.path.exists(history_file))