# Copyright (C) 2012-2017 by the Free Software Foundation, Inc. # # This file is part of GNU Mailman. # # GNU Mailman is free software: you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) # any later version. # # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more details. # # You should have received a copy of the GNU General Public License along with # GNU Mailman. If not, see . """Test the NNTP runner and related utilities.""" import socket import nntplib import unittest from mailman.app.lifecycle import create_list from mailman.config import config from mailman.interfaces.nntp import NewsgroupModeration from mailman.runners import nntp from mailman.testing.helpers import ( LogFileMark, configuration, get_queue_messages, make_testable_runner, specialized_message_from_string as mfs) from mailman.testing.layers import ConfigLayer from unittest import mock class TestPrepareMessage(unittest.TestCase): """Test message preparation.""" layer = ConfigLayer def setUp(self): self._mlist = create_list('test@example.com') self._mlist.linked_newsgroup = 'example.test' self._msg = mfs("""\ From: anne@example.com To: test@example.com Subject: A newsgroup posting Message-ID: Testing """) def test_moderated_approved_header(self): # When the mailing list is moderated , the message will get an # Approved header, which NNTP software uses to forward to the # newsgroup. The message would not have gotten to the mailing list if # it wasn't already approved. self._mlist.newsgroup_moderation = NewsgroupModeration.moderated nntp.prepare_message(self._mlist, self._msg, {}) self.assertEqual(self._msg['approved'], 'test@example.com') def test_open_moderated_approved_header(self): # When the mailing list is moderated using an open posting policy, the # message will get an Approved header, which NNTP software uses to # forward to the newsgroup. The message would not have gotten to the # mailing list if it wasn't already approved. self._mlist.newsgroup_moderation = NewsgroupModeration.open_moderated nntp.prepare_message(self._mlist, self._msg, {}) self.assertEqual(self._msg['approved'], 'test@example.com') def test_moderation_removes_previous_approved_header(self): # Any existing Approved header is removed from moderated messages. self._msg['Approved'] = 'a bogus approval' self._mlist.newsgroup_moderation = NewsgroupModeration.moderated nntp.prepare_message(self._mlist, self._msg, {}) headers = self._msg.get_all('approved') self.assertEqual(len(headers), 1) self.assertEqual(headers[0], 'test@example.com') def test_open_moderation_removes_previous_approved_header(self): # Any existing Approved header is removed from moderated messages. self._msg['Approved'] = 'a bogus approval' self._mlist.newsgroup_moderation = NewsgroupModeration.open_moderated nntp.prepare_message(self._mlist, self._msg, {}) headers = self._msg.get_all('approved') self.assertEqual(len(headers), 1) self.assertEqual(headers[0], 'test@example.com') def test_stripped_subject(self): # The cook-headers handler adds the original and/or stripped (of the # prefix) subject to the metadata. Assume that handler's been run; # check the Subject header. self._mlist.nntp_prefix_subject_too = False del self._msg['subject'] self._msg['subject'] = 'Re: Your test' msgdata = dict(stripped_subject='Your test') nntp.prepare_message(self._mlist, self._msg, msgdata) headers = self._msg.get_all('subject') self.assertEqual(len(headers), 1) self.assertEqual(headers[0], 'Your test') def test_original_subject(self): # The cook-headers handler adds the original and/or stripped (of the # prefix) subject to the metadata. Assume that handler's been run; # check the Subject header. self._mlist.nntp_prefix_subject_too = False del self._msg['subject'] self._msg['subject'] = 'Re: Your test' msgdata = dict(original_subject='Your test') nntp.prepare_message(self._mlist, self._msg, msgdata) headers = self._msg.get_all('subject') self.assertEqual(len(headers), 1) self.assertEqual(headers[0], 'Your test') def test_stripped_subject_prefix_okay(self): # The cook-headers handler adds the original and/or stripped (of the # prefix) subject to the metadata. Assume that handler's been run; # check the Subject header. self._mlist.nntp_prefix_subject_too = True del self._msg['subject'] self._msg['subject'] = 'Re: Your test' msgdata = dict(stripped_subject='Your test') nntp.prepare_message(self._mlist, self._msg, msgdata) headers = self._msg.get_all('subject') self.assertEqual(len(headers), 1) self.assertEqual(headers[0], 'Re: Your test') def test_original_subject_prefix_okay(self): # The cook-headers handler adds the original and/or stripped (of the # prefix) subject to the metadata. Assume that handler's been run; # check the Subject header. self._mlist.nntp_prefix_subject_too = True del self._msg['subject'] self._msg['subject'] = 'Re: Your test' msgdata = dict(original_subject='Your test') nntp.prepare_message(self._mlist, self._msg, msgdata) headers = self._msg.get_all('subject') self.assertEqual(len(headers), 1) self.assertEqual(headers[0], 'Re: Your test') def test_add_newsgroups_header(self): # Prepared messages get a Newsgroups header. msgdata = dict(original_subject='Your test') nntp.prepare_message(self._mlist, self._msg, msgdata) self.assertEqual(self._msg['newsgroups'], 'example.test') def test_add_newsgroups_header_to_existing(self): # If the message already has a Newsgroups header, the linked newsgroup # gets appended to that value, using comma-space separated lists. self._msg['Newsgroups'] = 'foo.test, bar.test' msgdata = dict(original_subject='Your test') nntp.prepare_message(self._mlist, self._msg, msgdata) headers = self._msg.get_all('newsgroups') self.assertEqual(len(headers), 1) self.assertEqual(headers[0], 'foo.test, bar.test, example.test') def test_add_lines_header(self): # A Lines: header seems useful. nntp.prepare_message(self._mlist, self._msg, {}) self.assertEqual(self._msg['lines'], '1') def test_the_message_has_been_prepared(self): # A key gets added to the metadata so that a retry won't try to # re-apply all the preparations. msgdata = {} nntp.prepare_message(self._mlist, self._msg, msgdata) self.assertTrue(msgdata.get('prepped')) @configuration('nntp', remove_headers='x-complaints-to') def test_remove_headers(self): # During preparation, headers which cause problems with certain NNTP # servers such as INN get removed. self._msg['X-Complaints-To'] = 'arguments@example.com' nntp.prepare_message(self._mlist, self._msg, {}) self.assertEqual(self._msg['x-complaints-to'], None) @configuration('nntp', rewrite_duplicate_headers=""" To X-Original-To X-Fake X-Original-Fake """) def test_rewrite_headers(self): # Some NNTP servers are very strict about duplicate headers. What we # can do is look at some headers and if they is more than one of that # header in the message, all the headers are deleted except the first # one, and then the other values are moved to the destination header. # # In this example, we'll create multiple To headers, which will all # get moved to X-Original-To. However, because there will only be one # X-Fake header, it doesn't get rewritten. self._msg['To'] = 'test@example.org' self._msg['To'] = 'test@example.net' self._msg['X-Fake'] = 'ignore me' self.assertEqual(len(self._msg.get_all('to')), 3) self.assertEqual(len(self._msg.get_all('x-fake')), 1) nntp.prepare_message(self._mlist, self._msg, {}) tos = self._msg.get_all('to') self.assertEqual(len(tos), 1) self.assertEqual(tos[0], 'test@example.com') original_tos = self._msg.get_all('x-original-to') self.assertEqual(len(original_tos), 2) self.assertEqual(original_tos, ['test@example.org', 'test@example.net']) fakes = self._msg.get_all('x-fake') self.assertEqual(len(fakes), 1) self.assertEqual(fakes[0], 'ignore me') self.assertEqual(self._msg.get_all('x-original-fake'), None) @configuration('nntp', rewrite_duplicate_headers=""" To X-Original-To X-Fake """) def test_odd_duplicates(self): # This is just a corner case, where there is an odd number of rewrite # headers. In that case, the odd-one-out does not get rewritten. self._msg['x-fake'] = 'one' self._msg['x-fake'] = 'two' self._msg['x-fake'] = 'three' self.assertEqual(len(self._msg.get_all('x-fake')), 3) nntp.prepare_message(self._mlist, self._msg, {}) fakes = self._msg.get_all('x-fake') self.assertEqual(len(fakes), 3) self.assertEqual(fakes, ['one', 'two', 'three']) class TestNNTPRunner(unittest.TestCase): """The NNTP runner hands messages off to the NNTP server.""" layer = ConfigLayer def setUp(self): self._mlist = create_list('test@example.com') self._mlist.linked_newsgroup = 'example.test' self._msg = mfs("""\ From: anne@example.com To: test@example.com Subject: A newsgroup posting Message-ID: Testing """) self._runner = make_testable_runner(nntp.NNTPRunner, 'nntp') self._nntpq = config.switchboards['nntp'] @mock.patch('nntplib.NNTP') def test_connect(self, class_mock): # Test connection to the NNTP server with default values. self._nntpq.enqueue(self._msg, {}, listid='test.example.com') self._runner.run() class_mock.assert_called_once_with( '', 119, user='', password='', readermode=True) @configuration('nntp', user='alpha', password='beta', host='nntp.example.com', port='2112') @mock.patch('nntplib.NNTP') def test_connect_with_configuration(self, class_mock): # Test connection to the NNTP server with specific values. self._nntpq.enqueue(self._msg, {}, listid='test.example.com') self._runner.run() class_mock.assert_called_once_with( 'nntp.example.com', 2112, user='alpha', password='beta', readermode=True) @mock.patch('nntplib.NNTP') def test_post(self, class_mock): # Test that the message is posted to the NNTP server. self._nntpq.enqueue(self._msg, {}, listid='test.example.com') self._runner.run() # Get the mocked instance, which was used in the runner. conn_mock = class_mock() # The connection object's post() method was called once with a # file-like object containing the message's bytes. Read those bytes # and make some simple checks that the message is what we expected. args = conn_mock.post.call_args # One positional argument. self.assertEqual(len(args[0]), 1) # No keyword arguments. self.assertEqual(len(args[1]), 0) msg = mfs(args[0][0].read()) self.assertEqual(msg['subject'], 'A newsgroup posting') @mock.patch('nntplib.NNTP') def test_connection_got_quit(self, class_mock): # The NNTP connection gets closed after a successful post. # Test that the message is posted to the NNTP server. self._nntpq.enqueue(self._msg, {}, listid='test.example.com') self._runner.run() # Get the mocked instance, which was used in the runner. conn_mock = class_mock() # The connection object's post() method was called once with a # file-like object containing the message's bytes. Read those bytes # and make some simple checks that the message is what we expected. conn_mock.quit.assert_called_once_with() @mock.patch('nntplib.NNTP', side_effect=nntplib.NNTPTemporaryError) def test_connect_with_nntplib_failure(self, class_mock): self._nntpq.enqueue(self._msg, {}, listid='test.example.com') mark = LogFileMark('mailman.error') self._runner.run() log_message = mark.readline()[:-1] self.assertTrue( log_message.endswith('NNTP error for test@example.com'), log_message) @mock.patch('nntplib.NNTP', side_effect=socket.error) def test_connect_with_socket_failure(self, class_mock): self._nntpq.enqueue(self._msg, {}, listid='test.example.com') mark = LogFileMark('mailman.error') self._runner.run() log_message = mark.readline()[:-1] self.assertTrue(log_message.endswith( 'NNTP socket error for test@example.com')) @mock.patch('nntplib.NNTP', side_effect=RuntimeError) def test_connect_with_other_failure(self, class_mock): # In this failure mode, the message stays queued, so we can only run # the nntp runner once. def once(runner): # I.e. stop immediately, since the queue will not be empty. return True runner = make_testable_runner(nntp.NNTPRunner, 'nntp', predicate=once) self._nntpq.enqueue(self._msg, {}, listid='test.example.com') mark = LogFileMark('mailman.error') runner.run() log_message = mark.readline()[:-1] self.assertTrue(log_message.endswith( 'NNTP unexpected exception for test@example.com')) items = get_queue_messages('nntp', expected_count=1) self.assertEqual(items[0].msgdata['listid'], 'test.example.com') self.assertEqual(items[0].msg['subject'], 'A newsgroup posting') @mock.patch('nntplib.NNTP', side_effect=nntplib.NNTPTemporaryError) def test_connection_never_gets_quit_after_failures(self, class_mock): # The NNTP connection doesn't get closed after a unsuccessful # connection, since there's nothing to close. self._nntpq.enqueue(self._msg, {}, listid='test.example.com') self._runner.run() # Get the mocked instance, which was used in the runner. Turn off the # exception raising side effect first though! class_mock.side_effect = None conn_mock = class_mock() # The connection object's post() method was called once with a # file-like object containing the message's bytes. Read those bytes # and make some simple checks that the message is what we expected. self.assertEqual(conn_mock.quit.call_count, 0) @mock.patch('nntplib.NNTP') def test_connection_got_quit_after_post_failure(self, class_mock): # The NNTP connection does get closed after a unsuccessful post. # Add a side-effect to the instance mock's .post() method. conn_mock = class_mock() conn_mock.post.side_effect = nntplib.NNTPTemporaryError self._nntpq.enqueue(self._msg, {}, listid='test.example.com') self._runner.run() # The connection object's post() method was called once with a # file-like object containing the message's bytes. Read those bytes # and make some simple checks that the message is what we expected. conn_mock.quit.assert_called_once_with()