summaryrefslogtreecommitdiff
path: root/src/mailman/database/tests/test_migrations.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman/database/tests/test_migrations.py')
-rw-r--r--src/mailman/database/tests/test_migrations.py506
1 files changed, 0 insertions, 506 deletions
diff --git a/src/mailman/database/tests/test_migrations.py b/src/mailman/database/tests/test_migrations.py
deleted file mode 100644
index 9619b80a4..000000000
--- a/src/mailman/database/tests/test_migrations.py
+++ /dev/null
@@ -1,506 +0,0 @@
-# Copyright (C) 2012-2014 by the Free Software Foundation, Inc.
-#
-# This file is part of GNU Mailman.
-#
-# GNU Mailman is free software: you can redistribute it and/or modify it under
-# the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 3 of the License, or (at your option)
-# any later version.
-#
-# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
-# more details.
-#
-# You should have received a copy of the GNU General Public License along with
-# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
-
-"""Test schema migrations."""
-
-from __future__ import absolute_import, print_function, unicode_literals
-
-__metaclass__ = type
-__all__ = [
- 'TestMigration20120407MigratedData',
- 'TestMigration20120407Schema',
- 'TestMigration20120407UnchangedData',
- 'TestMigration20121015MigratedData',
- 'TestMigration20121015Schema',
- 'TestMigration20130406MigratedData',
- 'TestMigration20130406Schema',
- ]
-
-
-import unittest
-
-from datetime import datetime
-from operator import attrgetter
-from pkg_resources import resource_string
-from sqlite3 import OperationalError
-from storm.exceptions import DatabaseError
-from zope.component import getUtility
-
-from mailman.interfaces.database import IDatabaseFactory
-from mailman.interfaces.domain import IDomainManager
-from mailman.interfaces.archiver import ArchivePolicy
-from mailman.interfaces.bounce import BounceContext
-from mailman.interfaces.listmanager import IListManager
-from mailman.interfaces.mailinglist import IAcceptableAliasSet
-from mailman.interfaces.nntp import NewsgroupModeration
-from mailman.interfaces.subscriptions import ISubscriptionService
-from mailman.model.bans import Ban
-from mailman.model.bounce import BounceEvent
-from mailman.testing.helpers import temporary_db
-from mailman.testing.layers import ConfigLayer
-
-
-
-class MigrationTestBase(unittest.TestCase):
- """Test database migrations."""
-
- layer = ConfigLayer
-
- def setUp(self):
- self._database = getUtility(IDatabaseFactory, 'temporary').create()
-
- def tearDown(self):
- self._database._cleanup()
-
- def _table_missing_present(self, migrations, missing, present):
- """The appropriate migrations leave some tables missing and present.
-
- :param migrations: Sequence of migrations to load.
- :param missing: Tables which should be missing.
- :param present: Tables which should be present.
- """
- for migration in migrations:
- self._database.load_migrations(migration)
- self._database.store.commit()
- for table in missing:
- self.assertRaises(OperationalError,
- self._database.store.execute,
- 'select * from {};'.format(table))
- for table in present:
- self._database.store.execute('select * from {};'.format(table))
-
- def _missing_present(self, table, migrations, missing, present):
- """The appropriate migrations leave columns missing and present.
-
- :param table: The table to test columns from.
- :param migrations: Sequence of migrations to load.
- :param missing: Set of columns which should be missing after the
- migrations are loaded.
- :param present: Set of columns which should be present after the
- migrations are loaded.
- """
- for migration in migrations:
- self._database.load_migrations(migration)
- self._database.store.commit()
- for column in missing:
- self.assertRaises(DatabaseError,
- self._database.store.execute,
- 'select {0} from {1};'.format(column, table))
- self._database.store.rollback()
- for column in present:
- # This should not produce an exception. Is there some better test
- # that we can perform?
- self._database.store.execute(
- 'select {0} from {1};'.format(column, table))
-
-
-
-class TestMigration20120407Schema(MigrationTestBase):
- """Test column migrations."""
-
- def test_pre_upgrade_columns_migration(self):
- # Test that before the migration, the old table columns are present
- # and the new database columns are not.
- self._missing_present('mailinglist',
- ['20120406999999'],
- # New columns are missing.
- ('allow_list_posts',
- 'archive_policy',
- 'list_id',
- 'nntp_prefix_subject_too'),
- # Old columns are present.
- ('archive',
- 'archive_private',
- 'archive_volume_frequency',
- 'generic_nonmember_action',
- 'include_list_post_header',
- 'news_moderation',
- 'news_prefix_subject_too',
- 'nntp_host'))
- self._missing_present('member',
- ['20120406999999'],
- ('list_id',),
- ('mailing_list',))
-
- def test_post_upgrade_columns_migration(self):
- # Test that after the migration, the old table columns are missing
- # and the new database columns are present.
- self._missing_present('mailinglist',
- ['20120406999999',
- '20120407000000'],
- # The old columns are missing.
- ('archive',
- 'archive_private',
- 'archive_volume_frequency',
- 'generic_nonmember_action',
- 'include_list_post_header',
- 'news_moderation',
- 'news_prefix_subject_too',
- 'nntp_host'),
- # The new columns are present.
- ('allow_list_posts',
- 'archive_policy',
- 'list_id',
- 'nntp_prefix_subject_too'))
- self._missing_present('member',
- ['20120406999999',
- '20120407000000'],
- ('mailing_list',),
- ('list_id',))
-
-
-
-class TestMigration20120407UnchangedData(MigrationTestBase):
- """Test non-migrated data."""
-
- def setUp(self):
- MigrationTestBase.setUp(self)
- # Load all the migrations to just before the one we're testing.
- self._database.load_migrations('20120406999999')
- # Load the previous schema's sample data.
- sample_data = resource_string(
- 'mailman.database.tests.data',
- 'migration_{0}_1.sql'.format(self._database.TAG))
- self._database.load_sql(self._database.store, sample_data)
- # XXX 2012-12-28: We have to load the last migration defined in the
- # system, otherwise the ORM model will not match the SQL table
- # definitions and we'll get OperationalErrors from SQLite.
- self._database.load_migrations('20121015000000')
-
- def test_migration_domains(self):
- # Test that the domains table, which isn't touched, doesn't change.
- with temporary_db(self._database):
- # Check that the domains survived the migration. This table
- # was not touched so it should be fine.
- domains = list(getUtility(IDomainManager))
- self.assertEqual(len(domains), 1)
- self.assertEqual(domains[0].mail_host, 'example.com')
-
- def test_migration_mailing_lists(self):
- # Test that the mailing lists survive migration.
- with temporary_db(self._database):
- # There should be exactly one mailing list defined.
- mlists = list(getUtility(IListManager).mailing_lists)
- self.assertEqual(len(mlists), 1)
- self.assertEqual(mlists[0].fqdn_listname, 'test@example.com')
-
- def test_migration_acceptable_aliases(self):
- # Test that the mailing list's acceptable aliases survive migration.
- # This proves that foreign key references are migrated properly.
- with temporary_db(self._database):
- mlist = getUtility(IListManager).get('test@example.com')
- aliases_set = IAcceptableAliasSet(mlist)
- self.assertEqual(set(aliases_set.aliases),
- set(['foo@example.com', 'bar@example.com']))
-
- def test_migration_members(self):
- # Test that the members of a mailing list all survive migration.
- with temporary_db(self._database):
- mlist = getUtility(IListManager).get('test@example.com')
- # Test that all the members we expect are still there. Start with
- # the two list delivery members.
- addresses = set(address.email
- for address in mlist.members.addresses)
- self.assertEqual(addresses,
- set(['anne@example.com', 'bart@example.com']))
- # There is one owner.
- owners = set(address.email for address in mlist.owners.addresses)
- self.assertEqual(len(owners), 1)
- self.assertEqual(owners.pop(), 'anne@example.com')
- # There is one moderator.
- moderators = set(address.email
- for address in mlist.moderators.addresses)
- self.assertEqual(len(moderators), 1)
- self.assertEqual(moderators.pop(), 'bart@example.com')
-
-
-
-class TestMigration20120407MigratedData(MigrationTestBase):
- """Test affected migration data."""
-
- def setUp(self):
- MigrationTestBase.setUp(self)
- # Load all the migrations to just before the one we're testing.
- self._database.load_migrations('20120406999999')
- # Load the previous schema's sample data.
- sample_data = resource_string(
- 'mailman.database.tests.data',
- 'migration_{0}_1.sql'.format(self._database.TAG))
- self._database.load_sql(self._database.store, sample_data)
-
- def _upgrade(self):
- # XXX 2012-12-28: We have to load the last migration defined in the
- # system, otherwise the ORM model will not match the SQL table
- # definitions and we'll get OperationalErrors from SQLite.
- self._database.load_migrations('20121015000000')
-
- def test_migration_archive_policy_never_0(self):
- # Test that the new archive_policy value is updated correctly. In the
- # case of old column archive=0, the archive_private column is
- # ignored. This test sets it to 0 to ensure it's ignored.
- self._database.store.execute(
- 'UPDATE mailinglist SET archive = {0}, archive_private = {0} '
- 'WHERE id = 1;'.format(self._database.FALSE))
- # Complete the migration
- self._upgrade()
- with temporary_db(self._database):
- mlist = getUtility(IListManager).get('test@example.com')
- self.assertEqual(mlist.archive_policy, ArchivePolicy.never)
-
- def test_migration_archive_policy_never_1(self):
- # Test that the new archive_policy value is updated correctly. In the
- # case of old column archive=0, the archive_private column is
- # ignored. This test sets it to 1 to ensure it's ignored.
- self._database.store.execute(
- 'UPDATE mailinglist SET archive = {0}, archive_private = {1} '
- 'WHERE id = 1;'.format(self._database.FALSE,
- self._database.TRUE))
- # Complete the migration
- self._upgrade()
- with temporary_db(self._database):
- mlist = getUtility(IListManager).get('test@example.com')
- self.assertEqual(mlist.archive_policy, ArchivePolicy.never)
-
- def test_archive_policy_private(self):
- # Test that the new archive_policy value is updated correctly for
- # private archives.
- self._database.store.execute(
- 'UPDATE mailinglist SET archive = {0}, archive_private = {0} '
- 'WHERE id = 1;'.format(self._database.TRUE))
- # Complete the migration
- self._upgrade()
- with temporary_db(self._database):
- mlist = getUtility(IListManager).get('test@example.com')
- self.assertEqual(mlist.archive_policy, ArchivePolicy.private)
-
- def test_archive_policy_public(self):
- # Test that the new archive_policy value is updated correctly for
- # public archives.
- self._database.store.execute(
- 'UPDATE mailinglist SET archive = {1}, archive_private = {0} '
- 'WHERE id = 1;'.format(self._database.FALSE,
- self._database.TRUE))
- # Complete the migration
- self._upgrade()
- with temporary_db(self._database):
- mlist = getUtility(IListManager).get('test@example.com')
- self.assertEqual(mlist.archive_policy, ArchivePolicy.public)
-
- def test_list_id(self):
- # Test that the mailinglist table gets a list_id column.
- self._upgrade()
- with temporary_db(self._database):
- mlist = getUtility(IListManager).get('test@example.com')
- self.assertEqual(mlist.list_id, 'test.example.com')
-
- def test_list_id_member(self):
- # Test that the member table's mailing_list column becomes list_id.
- self._upgrade()
- with temporary_db(self._database):
- service = getUtility(ISubscriptionService)
- members = list(service.find_members(list_id='test.example.com'))
- self.assertEqual(len(members), 4)
-
- def test_news_moderation_none(self):
- # Test that news_moderation becomes newsgroup_moderation.
- self._database.store.execute(
- 'UPDATE mailinglist SET news_moderation = 0 '
- 'WHERE id = 1;')
- self._upgrade()
- with temporary_db(self._database):
- mlist = getUtility(IListManager).get('test@example.com')
- self.assertEqual(mlist.newsgroup_moderation,
- NewsgroupModeration.none)
-
- def test_news_moderation_open_moderated(self):
- # Test that news_moderation becomes newsgroup_moderation.
- self._database.store.execute(
- 'UPDATE mailinglist SET news_moderation = 1 '
- 'WHERE id = 1;')
- self._upgrade()
- with temporary_db(self._database):
- mlist = getUtility(IListManager).get('test@example.com')
- self.assertEqual(mlist.newsgroup_moderation,
- NewsgroupModeration.open_moderated)
-
- def test_news_moderation_moderated(self):
- # Test that news_moderation becomes newsgroup_moderation.
- self._database.store.execute(
- 'UPDATE mailinglist SET news_moderation = 2 '
- 'WHERE id = 1;')
- self._upgrade()
- with temporary_db(self._database):
- mlist = getUtility(IListManager).get('test@example.com')
- self.assertEqual(mlist.newsgroup_moderation,
- NewsgroupModeration.moderated)
-
- def test_nntp_prefix_subject_too_false(self):
- # Test that news_prefix_subject_too becomes nntp_prefix_subject_too.
- self._database.store.execute(
- 'UPDATE mailinglist SET news_prefix_subject_too = {0} '
- 'WHERE id = 1;'.format(self._database.FALSE))
- self._upgrade()
- with temporary_db(self._database):
- mlist = getUtility(IListManager).get('test@example.com')
- self.assertFalse(mlist.nntp_prefix_subject_too)
-
- def test_nntp_prefix_subject_too_true(self):
- # Test that news_prefix_subject_too becomes nntp_prefix_subject_too.
- self._database.store.execute(
- 'UPDATE mailinglist SET news_prefix_subject_too = {0} '
- 'WHERE id = 1;'.format(self._database.TRUE))
- self._upgrade()
- with temporary_db(self._database):
- mlist = getUtility(IListManager).get('test@example.com')
- self.assertTrue(mlist.nntp_prefix_subject_too)
-
- def test_allow_list_posts_false(self):
- # Test that include_list_post_header -> allow_list_posts.
- self._database.store.execute(
- 'UPDATE mailinglist SET include_list_post_header = {0} '
- 'WHERE id = 1;'.format(self._database.FALSE))
- self._upgrade()
- with temporary_db(self._database):
- mlist = getUtility(IListManager).get('test@example.com')
- self.assertFalse(mlist.allow_list_posts)
-
- def test_allow_list_posts_true(self):
- # Test that include_list_post_header -> allow_list_posts.
- self._database.store.execute(
- 'UPDATE mailinglist SET include_list_post_header = {0} '
- 'WHERE id = 1;'.format(self._database.TRUE))
- self._upgrade()
- with temporary_db(self._database):
- mlist = getUtility(IListManager).get('test@example.com')
- self.assertTrue(mlist.allow_list_posts)
-
-
-
-class TestMigration20121015Schema(MigrationTestBase):
- """Test column migrations."""
-
- def test_pre_upgrade_column_migrations(self):
- self._missing_present('ban',
- ['20121014999999'],
- ('list_id',),
- ('mailing_list',))
- self._missing_present('mailinglist',
- ['20121014999999'],
- (),
- ('new_member_options', 'send_reminders',
- 'subscribe_policy', 'unsubscribe_policy',
- 'subscribe_auto_approval', 'private_roster',
- 'admin_member_chunksize'),
- )
-
- def test_post_upgrade_column_migrations(self):
- self._missing_present('ban',
- ['20121014999999',
- '20121015000000'],
- ('mailing_list',),
- ('list_id',))
- self._missing_present('mailinglist',
- ['20121014999999',
- '20121015000000'],
- ('new_member_options', 'send_reminders',
- 'subscribe_policy', 'unsubscribe_policy',
- 'subscribe_auto_approval', 'private_roster',
- 'admin_member_chunksize'),
- ())
-
-
-
-class TestMigration20121015MigratedData(MigrationTestBase):
- """Test non-migrated data."""
-
- def test_migration_bans(self):
- # Load all the migrations to just before the one we're testing.
- self._database.load_migrations('20121014999999')
- # Insert a list-specific ban.
- self._database.store.execute("""
- INSERT INTO ban VALUES (
- 1, 'anne@example.com', 'test@example.com');
- """)
- # Insert a global ban.
- self._database.store.execute("""
- INSERT INTO ban VALUES (
- 2, 'bart@example.com', NULL);
- """)
- # Update to the current migration we're testing.
- self._database.load_migrations('20121015000000')
- # Now both the local and global bans should still be present.
- bans = sorted(self._database.store.find(Ban),
- key=attrgetter('email'))
- self.assertEqual(bans[0].email, 'anne@example.com')
- self.assertEqual(bans[0].list_id, 'test.example.com')
- self.assertEqual(bans[1].email, 'bart@example.com')
- self.assertEqual(bans[1].list_id, None)
-
-
-
-class TestMigration20130406Schema(MigrationTestBase):
- """Test column migrations."""
-
- def test_pre_upgrade_column_migrations(self):
- self._missing_present('bounceevent',
- ['20130405999999'],
- ('list_id',),
- ('list_name',))
-
- def test_post_upgrade_column_migrations(self):
- self._missing_present('bounceevent',
- ['20130405999999',
- '20130406000000'],
- ('list_name',),
- ('list_id',))
-
- def test_pre_listarchiver_table(self):
- self._table_missing_present(['20130405999999'], ('listarchiver',), ())
-
- def test_post_listarchiver_table(self):
- self._table_missing_present(['20130405999999',
- '20130406000000'],
- (),
- ('listarchiver',))
-
-
-
-class TestMigration20130406MigratedData(MigrationTestBase):
- """Test migrated data."""
-
- def test_migration_bounceevent(self):
- # Load all migrations to just before the one we're testing.
- self._database.load_migrations('20130405999999')
- # Insert a bounce event.
- self._database.store.execute("""
- INSERT INTO bounceevent VALUES (
- 1, 'test@example.com', 'anne@example.com',
- '2013-04-06 21:12:00', '<abc@example.com>',
- 1, 0);
- """)
- # Update to the current migration we're testing
- self._database.load_migrations('20130406000000')
- # The bounce event should exist, but with a list-id instead of a fqdn
- # list name.
- events = list(self._database.store.find(BounceEvent))
- self.assertEqual(len(events), 1)
- self.assertEqual(events[0].list_id, 'test.example.com')
- self.assertEqual(events[0].email, 'anne@example.com')
- self.assertEqual(events[0].timestamp, datetime(2013, 4, 6, 21, 12))
- self.assertEqual(events[0].message_id, '<abc@example.com>')
- self.assertEqual(events[0].context, BounceContext.normal)
- self.assertFalse(events[0].processed)