diff options
Diffstat (limited to 'src/mailman/database/tests/test_migrations.py')
| -rw-r--r-- | src/mailman/database/tests/test_migrations.py | 335 |
1 files changed, 335 insertions, 0 deletions
diff --git a/src/mailman/database/tests/test_migrations.py b/src/mailman/database/tests/test_migrations.py new file mode 100644 index 000000000..6ad648623 --- /dev/null +++ b/src/mailman/database/tests/test_migrations.py @@ -0,0 +1,335 @@ +# Copyright (C) 2012 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. + +"""Test schema migrations.""" + +from __future__ import absolute_import, print_function, unicode_literals + +__metaclass__ = type +__all__ = [ + 'TestMigration20120407MigratedData', + 'TestMigration20120407Schema', + 'TestMigration20120407UnchangedData', + ] + + +import unittest + +from pkg_resources import resource_string +from storm.exceptions import DatabaseError +from zope.component import getUtility + +from mailman.interfaces.database import IDatabaseFactory +from mailman.interfaces.domain import IDomainManager +from mailman.interfaces.archiver import ArchivePolicy +from mailman.interfaces.listmanager import IListManager +from mailman.interfaces.mailinglist import IAcceptableAliasSet +from mailman.interfaces.nntp import NewsgroupModeration +from mailman.testing.helpers import temporary_db +from mailman.testing.layers import ConfigLayer + + + +class MigrationTestBase(unittest.TestCase): + """Test the dated migration (LP: #971013) + + Circa: 3.0b1 -> 3.0b2 + + table mailinglist: + * news_moderation -> newsgroup_moderation + * news_prefix_subject_too -> nntp_prefix_subject_too + * include_list_post_header -> allow_list_posts + * ADD archive_policy + * REMOVE archive + * REMOVE archive_private + * REMOVE archive_volume_frequency + * REMOVE nntp_host + """ + + layer = ConfigLayer + + def setUp(self): + self._database = getUtility(IDatabaseFactory, 'temporary').create() + + def tearDown(self): + self._database._cleanup() + + + +class TestMigration20120407Schema(MigrationTestBase): + """Test column migrations.""" + + def test_pre_upgrade_columns_migration(self): + # Test that before the migration, the old table columns are present + # and the new database columns are not. + # + # Load all the migrations to just before the one we're testing. + self._database.load_migrations('20120406999999') + self._database.store.commit() + # Verify that the database has not yet been migrated. + for missing in ('allow_list_posts', + 'archive_policy', + 'nntp_prefix_subject_too'): + self.assertRaises(DatabaseError, + self._database.store.execute, + 'select {0} from mailinglist;'.format(missing)) + self._database.store.rollback() + for present in ('archive', + 'archive_private', + 'archive_volume_frequency', + 'include_list_post_header', + 'news_moderation', + 'news_prefix_subject_too', + 'nntp_host'): + # This should not produce an exception. Is there some better test + # that we can perform? + self._database.store.execute( + 'select {0} from mailinglist;'.format(present)) + + def test_post_upgrade_columns_migration(self): + # Test that after the migration, the old table columns are missing + # and the new database columns are present. + # + # Load all the migrations up to and including the one we're testing. + self._database.load_migrations('20120406999999') + self._database.load_migrations('20120407000000') + # Verify that the database has been migrated. + for present in ('allow_list_posts', + 'archive_policy', + 'nntp_prefix_subject_too'): + # This should not produce an exception. Is there some better test + # that we can perform? + self._database.store.execute( + 'select {0} from mailinglist;'.format(present)) + for missing in ('archive', + 'archive_private', + 'archive_volume_frequency', + 'include_list_post_header', + 'news_moderation', + 'news_prefix_subject_too', + 'nntp_host'): + self.assertRaises(DatabaseError, + self._database.store.execute, + 'select {0} from mailinglist;'.format(missing)) + self._database.store.rollback() + + + +class TestMigration20120407UnchangedData(MigrationTestBase): + """Test non-migrated data.""" + + def setUp(self): + MigrationTestBase.setUp(self) + # Load all the migrations to just before the one we're testing. + self._database.load_migrations('20120406999999') + # Load the previous schema's sample data. + sample_data = resource_string( + 'mailman.database.tests.data', + 'migration_{0}_1.sql'.format(self._database.TAG)) + self._database.load_sql(self._database.store, sample_data) + # Update to the current migration we're testing. + self._database.load_migrations('20120407000000') + + def test_migration_domains(self): + # Test that the domains table, which isn't touched, doesn't change. + with temporary_db(self._database): + # Check that the domains survived the migration. This table + # was not touched so it should be fine. + domains = list(getUtility(IDomainManager)) + self.assertEqual(len(domains), 1) + self.assertEqual(domains[0].mail_host, 'example.com') + + def test_migration_mailing_lists(self): + # Test that the mailing lists survive migration. + with temporary_db(self._database): + # There should be exactly one mailing list defined. + mlists = list(getUtility(IListManager).mailing_lists) + self.assertEqual(len(mlists), 1) + self.assertEqual(mlists[0].fqdn_listname, 'test@example.com') + + def test_migration_acceptable_aliases(self): + # Test that the mailing list's acceptable aliases survive migration. + # This proves that foreign key references are migrated properly. + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + aliases_set = IAcceptableAliasSet(mlist) + self.assertEqual(set(aliases_set.aliases), + set(['foo@example.com', 'bar@example.com'])) + + def test_migration_members(self): + # Test that the members of a mailing list all survive migration. + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + # Test that all the members we expect are still there. Start with + # the two list delivery members. + addresses = set(address.email + for address in mlist.members.addresses) + self.assertEqual(addresses, + set(['anne@example.com', 'bart@example.com'])) + # There is one owner. + owners = set(address.email for address in mlist.owners.addresses) + self.assertEqual(len(owners), 1) + self.assertEqual(owners.pop(), 'anne@example.com') + # There is one moderator. + moderators = set(address.email + for address in mlist.moderators.addresses) + self.assertEqual(len(moderators), 1) + self.assertEqual(moderators.pop(), 'bart@example.com') + + + +class TestMigration20120407MigratedData(MigrationTestBase): + """Test affected migration data.""" + + def setUp(self): + MigrationTestBase.setUp(self) + # Load all the migrations to just before the one we're testing. + self._database.load_migrations('20120406999999') + # Load the previous schema's sample data. + sample_data = resource_string( + 'mailman.database.tests.data', + 'migration_{0}_1.sql'.format(self._database.TAG)) + self._database.load_sql(self._database.store, sample_data) + + def _upgrade(self): + # Update to the current migration we're testing. + self._database.load_migrations('20120407000000') + + def test_migration_archive_policy_never_0(self): + # Test that the new archive_policy value is updated correctly. In the + # case of old column archive=0, the archive_private column is + # ignored. This test sets it to 0 to ensure it's ignored. + self._database.store.execute( + 'UPDATE mailinglist SET archive = {0}, archive_private = {0} ' + 'WHERE id = 1;'.format(self._database.FALSE)) + # Complete the migration + self._upgrade() + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + self.assertEqual(mlist.archive_policy, ArchivePolicy.never) + + def test_migration_archive_policy_never_1(self): + # Test that the new archive_policy value is updated correctly. In the + # case of old column archive=0, the archive_private column is + # ignored. This test sets it to 1 to ensure it's ignored. + self._database.store.execute( + 'UPDATE mailinglist SET archive = {0}, archive_private = {1} ' + 'WHERE id = 1;'.format(self._database.FALSE, + self._database.TRUE)) + # Complete the migration + self._upgrade() + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + self.assertEqual(mlist.archive_policy, ArchivePolicy.never) + + def test_archive_policy_private(self): + # Test that the new archive_policy value is updated correctly for + # private archives. + self._database.store.execute( + 'UPDATE mailinglist SET archive = {0}, archive_private = {0} ' + 'WHERE id = 1;'.format(self._database.TRUE)) + # Complete the migration + self._upgrade() + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + self.assertEqual(mlist.archive_policy, ArchivePolicy.private) + + def test_archive_policy_public(self): + # Test that the new archive_policy value is updated correctly for + # public archives. + self._database.store.execute( + 'UPDATE mailinglist SET archive = {1}, archive_private = {0} ' + 'WHERE id = 1;'.format(self._database.FALSE, + self._database.TRUE)) + # Complete the migration + self._upgrade() + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + self.assertEqual(mlist.archive_policy, ArchivePolicy.public) + + def test_news_moderation_none(self): + # Test that news_moderation becomes newsgroup_moderation. + self._database.store.execute( + 'UPDATE mailinglist SET news_moderation = 0 ' + 'WHERE id = 1;') + self._upgrade() + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + self.assertEqual(mlist.newsgroup_moderation, + NewsgroupModeration.none) + + def test_news_moderation_open_moderated(self): + # Test that news_moderation becomes newsgroup_moderation. + self._database.store.execute( + 'UPDATE mailinglist SET news_moderation = 1 ' + 'WHERE id = 1;') + self._upgrade() + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + self.assertEqual(mlist.newsgroup_moderation, + NewsgroupModeration.open_moderated) + + def test_news_moderation_moderated(self): + # Test that news_moderation becomes newsgroup_moderation. + self._database.store.execute( + 'UPDATE mailinglist SET news_moderation = 2 ' + 'WHERE id = 1;') + self._upgrade() + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + self.assertEqual(mlist.newsgroup_moderation, + NewsgroupModeration.moderated) + + def test_nntp_prefix_subject_too_false(self): + # Test that news_prefix_subject_too becomes nntp_prefix_subject_too. + self._database.store.execute( + 'UPDATE mailinglist SET news_prefix_subject_too = {0} ' + 'WHERE id = 1;'.format(self._database.FALSE)) + self._upgrade() + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + self.assertFalse(mlist.nntp_prefix_subject_too) + + def test_nntp_prefix_subject_too_true(self): + # Test that news_prefix_subject_too becomes nntp_prefix_subject_too. + self._database.store.execute( + 'UPDATE mailinglist SET news_prefix_subject_too = {0} ' + 'WHERE id = 1;'.format(self._database.TRUE)) + self._upgrade() + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + self.assertTrue(mlist.nntp_prefix_subject_too) + + def test_allow_list_posts_false(self): + # Test that include_list_post_header -> allow_list_posts. + self._database.store.execute( + 'UPDATE mailinglist SET include_list_post_header = {0} ' + 'WHERE id = 1;'.format(self._database.FALSE)) + self._upgrade() + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + self.assertFalse(mlist.allow_list_posts) + + def test_allow_list_posts_true(self): + # Test that include_list_post_header -> allow_list_posts. + self._database.store.execute( + 'UPDATE mailinglist SET include_list_post_header = {0} ' + 'WHERE id = 1;'.format(self._database.TRUE)) + self._upgrade() + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + self.assertTrue(mlist.allow_list_posts) |
