diff options
| author | Barry Warsaw | 2012-07-23 10:40:53 -0400 |
|---|---|---|
| committer | Barry Warsaw | 2012-07-23 10:40:53 -0400 |
| commit | 5598b9eea196e4085aa91aaf8a0cacaffa200355 (patch) | |
| tree | d3685e9c193705ecee93bc573af960daff874e9e /src/mailman/database/tests/test_migrations.py | |
| parent | b2e4c6502c5ff4cdf9488be17556a6d39bbbde6b (diff) | |
| download | mailman-5598b9eea196e4085aa91aaf8a0cacaffa200355.tar.gz mailman-5598b9eea196e4085aa91aaf8a0cacaffa200355.tar.zst mailman-5598b9eea196e4085aa91aaf8a0cacaffa200355.zip | |
Diffstat (limited to 'src/mailman/database/tests/test_migrations.py')
| -rw-r--r-- | src/mailman/database/tests/test_migrations.py | 269 |
1 files changed, 199 insertions, 70 deletions
diff --git a/src/mailman/database/tests/test_migrations.py b/src/mailman/database/tests/test_migrations.py index c772a63d5..0e6edae47 100644 --- a/src/mailman/database/tests/test_migrations.py +++ b/src/mailman/database/tests/test_migrations.py @@ -21,7 +21,9 @@ from __future__ import absolute_import, print_function, unicode_literals __metaclass__ = type __all__ = [ - 'TestMigration20120407', + 'TestMigration20120407ArchiveData', + 'TestMigration20120407Data', + 'TestMigration20120407Schema', ] @@ -31,11 +33,12 @@ import sqlite3 import tempfile import unittest -from pkg_resources import resource_filename +from pkg_resources import resource_string from zope.component import getUtility from mailman.config import config from mailman.interfaces.domain import IDomainManager +from mailman.interfaces.archiver import ArchivePolicy from mailman.interfaces.listmanager import IListManager from mailman.interfaces.mailinglist import IAcceptableAliasSet from mailman.testing.helpers import configuration, temporary_db @@ -44,7 +47,7 @@ from mailman.utilities.modules import call_name -class TestMigration20120407(unittest.TestCase): +class TestMigration20120407Schema(unittest.TestCase): """Test the dated migration (LP: #971013) Circa: 3.0b1 -> 3.0b2 @@ -62,26 +65,25 @@ class TestMigration20120407(unittest.TestCase): layer = ConfigLayer def setUp(self): - self._tempdir = tempfile.mkdtemp() + database_class_name = config.database['class'] + database_class = call_name(database_class_name) + self._temporary = database_class._make_temporary() + self._database = self._temporary.database def tearDown(self): - shutil.rmtree(self._tempdir) + self._temporary.cleanup() def test_sqlite_base(self): # Test that before the migration, the old table columns are present # and the new database columns are not. - url = 'sqlite:///' + os.path.join(self._tempdir, 'mailman.db') - database_class = config.database['class'] - database = call_name(database_class) - with configuration('database', url=url): - database.initialize() - # Load all the database SQL to just before ours. - database.load_migrations('20120406999999') + # + # Load all the migrations to just before the one we're testing. + self._database.load_migrations('20120406999999') # Verify that the database has not yet been migrated. for missing in ('archive_policy', 'nntp_prefix_subject_too'): self.assertRaises(sqlite3.OperationalError, - database.store.execute, + self._database.store.execute, 'select {0} from mailinglist;'.format(missing)) for present in ('archive', 'archive_private', @@ -91,27 +93,22 @@ class TestMigration20120407(unittest.TestCase): 'nntp_host'): # This should not produce an exception. Is there some better test # that we can perform? - database.store.execute( + self._database.store.execute( 'select {0} from mailinglist;'.format(present)) def test_sqlite_migration(self): # Test that after the migration, the old table columns are missing # and the new database columns are present. - url = 'sqlite:///' + os.path.join(self._tempdir, 'mailman.db') - database_class = config.database['class'] - database = call_name(database_class) - with configuration('database', url=url): - database.initialize() - # Load all the database SQL to just before ours. - database.load_migrations('20120406999999') - # Load all migrations, up to and including this one. - database.load_migrations('20120407000000') + # + # Load all the migrations up to and including the one we're testing. + self._database.load_migrations('20120406999999') + self._database.load_migrations('20120407000000') # Verify that the database has been migrated. for present in ('archive_policy', 'nntp_prefix_subject_too'): # This should not produce an exception. Is there some better test # that we can perform? - database.store.execute( + self._database.store.execute( 'select {0} from mailinglist;'.format(present)) for missing in ('archive', 'archive_private', @@ -120,51 +117,183 @@ class TestMigration20120407(unittest.TestCase): 'news_prefix_subject_too', 'nntp_host'): self.assertRaises(sqlite3.OperationalError, - database.store.execute, + self._database.store.execute, 'select {0} from mailinglist;'.format(missing)) - def test_data_after_migration(self): - # Ensure that the existing data and foreign key references are - # preserved across a migration. Unfortunately, this requires sample - # data, which kind of sucks. - dst = os.path.join(self._tempdir, 'mailman.db') - src = resource_filename('mailman.database.tests.data', 'mailman_01.db') - shutil.copyfile(src, dst) - url = 'sqlite:///' + dst - database_class = config.database['class'] - database = call_name(database_class) - with configuration('database', url=url): - # Initialize the database and perform the migrations. - database.initialize() - database.load_migrations('20120407000000') - with temporary_db(database): - # Check that the domains survived the migration. This table - # was not touched so it should be fine. - domains = list(getUtility(IDomainManager)) - self.assertEqual(len(domains), 1) - self.assertEqual(domains[0].mail_host, 'example.com') - # There should be exactly one mailing list defined. - mlists = list(getUtility(IListManager).mailing_lists) - self.assertEqual(len(mlists), 1) - # Get the mailing list object and check its acceptable - # aliases. This tests that foreign keys continue to work. - mlist = mlists[0] - aliases_set = IAcceptableAliasSet(mlist) - self.assertEqual(set(aliases_set.aliases), - set(['foo@example.com', 'bar@example.com'])) - # Test that all the members we expect are still there. Start - # with the two list delivery members. - addresses = set(address.email - for address in mlist.members.addresses) - self.assertEqual(addresses, set(['anne@example.com', - 'bart@example.com'])) - # There is one owner. - owners = set(address.email - for address in mlist.owners.addresses) - self.assertEqual(len(owners), 1) - self.assertEqual(owners.pop(), 'anne@example.com') - # There is one moderator. - moderators = set(address.email - for address in mlist.moderators.addresses) - self.assertEqual(len(moderators), 1) - self.assertEqual(moderators.pop(), 'bart@example.com') + + +class TestMigration20120407Data(unittest.TestCase): + """Test the dated migration (LP: #971013) + + Circa: 3.0b1 -> 3.0b2 + + table mailinglist: + * news_moderation -> newsgroup_moderation + * news_prefix_subject_too -> nntp_prefix_subject_too + * ADD archive_policy + * REMOVE archive + * REMOVE archive_private + * REMOVE archive_volume_frequency + * REMOVE nntp_host + """ + + layer = ConfigLayer + + def setUp(self): + database_class_name = config.database['class'] + database_class = call_name(database_class_name) + self._temporary = database_class._make_temporary() + self._database = self._temporary.database + # Load all the migrations to just before the one we're testing. + self._database.load_migrations('20120406999999') + # Load the previous schema's sample data. + sample_data = resource_string( + 'mailman.database.tests.data', 'migration_test_1.sql') + self._database.load_sql(self._database.store, sample_data) + # Update to the current migration we're testing. + self._database.load_migrations('20120407000000') + + def tearDown(self): + self._temporary.cleanup() + + def test_migration_domains(self): + # Test that the domains table, which isn't touched, doesn't change. + with temporary_db(self._database): + # Check that the domains survived the migration. This table + # was not touched so it should be fine. + domains = list(getUtility(IDomainManager)) + self.assertEqual(len(domains), 1) + self.assertEqual(domains[0].mail_host, 'example.com') + + def test_migration_mailing_lists(self): + # Test that the mailing lists survive migration. + with temporary_db(self._database): + # There should be exactly one mailing list defined. + mlists = list(getUtility(IListManager).mailing_lists) + self.assertEqual(len(mlists), 1) + self.assertEqual(mlists[0].fqdn_listname, 'test@example.com') + + def test_migration_acceptable_aliases(self): + # Test that the mailing list's acceptable aliases survive migration. + # This proves that foreign key references are migrated properly. + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + aliases_set = IAcceptableAliasSet(mlist) + self.assertEqual(set(aliases_set.aliases), + set(['foo@example.com', 'bar@example.com'])) + + def test_migration_members(self): + # Test that the members of a mailing list all survive migration. + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + # Test that all the members we expect are still there. Start with + # the two list delivery members. + addresses = set(address.email + for address in mlist.members.addresses) + self.assertEqual(addresses, + set(['anne@example.com', 'bart@example.com'])) + # There is one owner. + owners = set(address.email for address in mlist.owners.addresses) + self.assertEqual(len(owners), 1) + self.assertEqual(owners.pop(), 'anne@example.com') + # There is one moderator. + moderators = set(address.email + for address in mlist.moderators.addresses) + self.assertEqual(len(moderators), 1) + self.assertEqual(moderators.pop(), 'bart@example.com') + + + +class TestMigration20120407ArchiveData(unittest.TestCase): + """Test the dated migration (LP: #971013) + + Circa: 3.0b1 -> 3.0b2 + + table mailinglist: + * news_moderation -> newsgroup_moderation + * news_prefix_subject_too -> nntp_prefix_subject_too + * ADD archive_policy + * REMOVE archive + * REMOVE archive_private + * REMOVE archive_volume_frequency + * REMOVE nntp_host + """ + + layer = ConfigLayer + + def setUp(self): + database_class_name = config.database['class'] + database_class = call_name(database_class_name) + self._temporary = database_class._make_temporary() + self._database = self._temporary.database + # Load all the migrations to just before the one we're testing. + self._database.load_migrations('20120406999999') + # Load the previous schema's sample data. + sample_data = resource_string( + 'mailman.database.tests.data', 'migration_test_1.sql') + self._database.load_sql(self._database.store, sample_data) + + def _upgrade(self): + # Update to the current migration we're testing. + self._database.load_migrations('20120407000000') + + def tearDown(self): + self._temporary.cleanup() + + def test_migration_archive_policy_never_0(self): + # Test that the new archive_policy value is updated correctly. In the + # case of old column archive=0, the archive_private column is + # ignored. This test sets it to 0 to ensure it's ignored. + with configuration('database', url=self._url): + # Set the old archive values. + self._database.store.execute( + 'UPDATE mailinglist SET archive = 0, archive_private = 0 ' + 'WHERE id = 1;') + # Complete the migration + self._upgrade() + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + self.assertEqual(mlist.archive_policy, ArchivePolicy.never) + + def test_migration_archive_policy_never_1(self): + # Test that the new archive_policy value is updated correctly. In the + # case of old column archive=0, the archive_private column is + # ignored. This test sets it to 1 to ensure it's ignored. + with configuration('database', url=self._url): + # Set the old archive values. + self._database.store.execute( + 'UPDATE mailinglist SET archive = 0, archive_private = 1 ' + 'WHERE id = 1;') + # Complete the migration + self._upgrade() + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + self.assertEqual(mlist.archive_policy, ArchivePolicy.never) + + def test_archive_policy_private(self): + # Test that the new archive_policy value is updated correctly for + # private archives. + with configuration('database', url=self._url): + # Set the old archive values. + self._database.store.execute( + 'UPDATE mailinglist SET archive = 1, archive_private = 1 ' + 'WHERE id = 1;') + # Complete the migration + self._upgrade() + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + self.assertEqual(mlist.archive_policy, ArchivePolicy.private) + + def test_archive_policy_public(self): + # Test that the new archive_policy value is updated correctly for + # public archives. + with configuration('database', url=self._url): + # Set the old archive values. + self._database.store.execute( + 'UPDATE mailinglist SET archive = 1, archive_private = 0 ' + 'WHERE id = 1;') + # Complete the migration + self._upgrade() + with temporary_db(self._database): + mlist = getUtility(IListManager).get('test@example.com') + self.assertEqual(mlist.archive_policy, ArchivePolicy.public) |
