# Copyright (C) 2012-2017 by the Free Software Foundation, Inc. # # This file is part of GNU Mailman. # # GNU Mailman is free software: you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) # any later version. # # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more details. # # You should have received a copy of the GNU General Public License along with # GNU Mailman. If not, see . """Test the various pending requests interfaces.""" import unittest from contextlib import contextmanager from itertools import count from mailman.app.lifecycle import create_list from mailman.app.moderator import hold_message from mailman.config import config from mailman.interfaces.requests import IListRequests, RequestType from mailman.model.requests import _Request from mailman.testing.helpers import specialized_message_from_string as mfs from mailman.testing.layers import ConfigLayer from sqlalchemy.event import listen, remove @contextmanager def before_flush(id_hacker): listen(config.db.store, 'before_flush', id_hacker) try: yield finally: remove(config.db.store, 'before_flush', id_hacker) class TestRequests(unittest.TestCase): layer = ConfigLayer def setUp(self): self._mlist = create_list('ant@example.com') self._requests_db = IListRequests(self._mlist) self._msg = mfs("""\ From: anne@example.com To: ant@example.com Subject: Something Message-ID: Something else. """) def test_get_request_with_type(self): # get_request() takes an optional request type. request_id = hold_message(self._mlist, self._msg) # Submit a request with a non-matching type. This should return None # as if there were no matches. response = self._requests_db.get_request( request_id, RequestType.subscription) self.assertEqual(response, None) # Submit the same request with a matching type. key, data = self._requests_db.get_request( request_id, RequestType.held_message) self.assertEqual(key, '') # It should also succeed with no optional request type given. key, data = self._requests_db.get_request(request_id) self.assertEqual(key, '') def test_hold_with_bogus_type(self): # Calling hold_request() with a bogus request type is an error. with self.assertRaises(TypeError) as cm: self._requests_db.hold_request(5, 'foo') self.assertEqual(cm.exception.args[0], 5) def test_delete_missing_request(self): # Trying to delete a missing request is an error. with self.assertRaises(KeyError) as cm: self._requests_db.delete_request(801) self.assertEqual(cm.exception.args[0], 801) def test_only_return_this_lists_requests(self): # Issue #161: get_requests() returns requests that are not specific to # the mailing list in question. request_id = hold_message(self._mlist, self._msg) bee = create_list('bee@example.com') self.assertIsNone(IListRequests(bee).get_request(request_id)) def test_request_order(self): # Requests must be sorted in creation order. # # This test only "works" for PostgreSQL, in the sense that if you # remove the fix in ../requests.py, it will still pass in SQLite. # Apparently SQLite auto-sorts results by ID but PostgreSQL autosorts # by insertion time. It's still worth keeping the test to prevent # regressions. # # We modify the auto-incremented ids by listening to SQLAlchemy's # flush event, and hacking all the _Request object id's to the next # value in a descending counter. request_ids = [] counter = count(200, -1) def id_hacker(session, flush_context, instances): # noqa: E306 for instance in session.new: if isinstance(instance, _Request): instance.id = next(counter) with before_flush(id_hacker): for index in range(10): msg = mfs(self._msg.as_string()) msg.replace_header('Message-ID', ''.format(index)) request_ids.append(hold_message(self._mlist, msg)) config.db.store.flush() # Make sure that our ID are not already sorted. self.assertNotEqual(request_ids, sorted(request_ids)) # Get requests and check their order. requests = self._requests_db.of_type(RequestType.held_message) self.assertEqual([r.id for r in requests], sorted(request_ids))