diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/mailman/app/templates.py | 2 | ||||
| -rw-r--r-- | src/mailman/app/tests/test_templates.py | 11 | ||||
| -rw-r--r-- | src/mailman/commands/cli_import.py | 8 | ||||
| -rw-r--r-- | src/mailman/handlers/decorate.py | 9 | ||||
| -rw-r--r-- | src/mailman/model/listmanager.py | 2 | ||||
| -rw-r--r-- | src/mailman/model/tests/test_listmanager.py | 15 | ||||
| -rw-r--r-- | src/mailman/utilities/importer.py | 336 | ||||
| -rw-r--r-- | src/mailman/utilities/tests/test_import.py | 752 |
8 files changed, 1119 insertions, 16 deletions
diff --git a/src/mailman/app/templates.py b/src/mailman/app/templates.py index 707e7c256..f6b26811c 100644 --- a/src/mailman/app/templates.py +++ b/src/mailman/app/templates.py @@ -103,4 +103,4 @@ class TemplateLoader: def get(self, uri): """See `ITemplateLoader`.""" with closing(urllib2.urlopen(uri)) as fp: - return fp.read() + return unicode(fp.read(), "utf-8") diff --git a/src/mailman/app/tests/test_templates.py b/src/mailman/app/tests/test_templates.py index 77a0eb381..634d00907 100644 --- a/src/mailman/app/tests/test_templates.py +++ b/src/mailman/app/tests/test_templates.py @@ -126,3 +126,14 @@ class TestTemplateLoader(unittest.TestCase): with self.assertRaises(urllib2.URLError) as cm: self._loader.get('mailman:///missing@example.com/en/foo/demo.txt') self.assertEqual(cm.exception.reason, 'No such file') + + def test_non_ascii(self): + # mailman://demo.txt with non-ascii content + test_text = b'\xe4\xb8\xad' + path = os.path.join(self.var_dir, 'templates', 'site', 'it') + os.makedirs(path) + with open(os.path.join(path, 'demo.txt'), 'w') as fp: + print(test_text, end='', file=fp) + content = self._loader.get('mailman:///it/demo.txt') + self.assertTrue(isinstance(content, unicode)) + self.assertEqual(content, test_text.decode("utf-8")) diff --git a/src/mailman/commands/cli_import.py b/src/mailman/commands/cli_import.py index c8145429d..880457334 100644 --- a/src/mailman/commands/cli_import.py +++ b/src/mailman/commands/cli_import.py @@ -35,7 +35,7 @@ from mailman.core.i18n import _ from mailman.database.transaction import transactional from mailman.interfaces.command import ICLISubCommand from mailman.interfaces.listmanager import IListManager -from mailman.utilities.importer import import_config_pck +from mailman.utilities.importer import import_config_pck, Import21Error @@ -93,4 +93,8 @@ class Import21: print(_('Ignoring non-dictionary: {0!r}').format( config_dict), file=sys.stderr) continue - import_config_pck(mlist, config_dict) + try: + import_config_pck(mlist, config_dict) + except Import21Error, e: + print(e, file=sys.stderr) + sys.exit(1) diff --git a/src/mailman/handlers/decorate.py b/src/mailman/handlers/decorate.py index 945c50bd5..815a24569 100644 --- a/src/mailman/handlers/decorate.py +++ b/src/mailman/handlers/decorate.py @@ -201,8 +201,8 @@ def process(mlist, msg, msgdata): def decorate(mlist, uri, extradict=None): - """Expand the decoration template.""" - if uri is None: + """Expand the decoration template from its URI.""" + if uri is None or uri == '': return '' # Get the decorator template. loader = getUtility(ITemplateLoader) @@ -211,6 +211,11 @@ def decorate(mlist, uri, extradict=None): language=mlist.preferred_language.code, )) template = loader.get(template_uri) + return decorate_template(mlist, template, extradict) + + +def decorate_template(mlist, template, extradict=None): + """Expand the decoration template.""" # Create a dictionary which includes the default set of interpolation # variables allowed in headers and footers. These will be augmented by # any key/value pairs in the extradict. diff --git a/src/mailman/model/listmanager.py b/src/mailman/model/listmanager.py index 5e260a6cd..f1c2941e0 100644 --- a/src/mailman/model/listmanager.py +++ b/src/mailman/model/listmanager.py @@ -34,6 +34,7 @@ from mailman.interfaces.listmanager import ( IListManager, ListAlreadyExistsError, ListCreatedEvent, ListCreatingEvent, ListDeletedEvent, ListDeletingEvent) from mailman.model.mailinglist import MailingList +from mailman.model.mime import ContentFilter from mailman.utilities.datetime import now @@ -79,6 +80,7 @@ class ListManager: """See `IListManager`.""" fqdn_listname = mlist.fqdn_listname notify(ListDeletingEvent(mlist)) + store.find(ContentFilter, ContentFilter.mailing_list == mlist).remove() store.remove(mlist) notify(ListDeletedEvent(fqdn_listname)) diff --git a/src/mailman/model/tests/test_listmanager.py b/src/mailman/model/tests/test_listmanager.py index 152d96b9f..32f16993f 100644 --- a/src/mailman/model/tests/test_listmanager.py +++ b/src/mailman/model/tests/test_listmanager.py @@ -27,6 +27,7 @@ __all__ = [ import unittest from zope.component import getUtility +from storm.locals import Store from mailman.app.lifecycle import create_list from mailman.app.moderator import hold_message @@ -37,6 +38,7 @@ from mailman.interfaces.messages import IMessageStore from mailman.interfaces.requests import IListRequests from mailman.interfaces.subscriptions import ISubscriptionService from mailman.interfaces.usermanager import IUserManager +from mailman.model.mime import ContentFilter from mailman.testing.helpers import ( event_subscribers, specialized_message_from_string) from mailman.testing.layers import ConfigLayer @@ -126,6 +128,19 @@ Message-ID: <argon> saved_message = getUtility(IMessageStore).get_message_by_id('<argon>') self.assertEqual(saved_message.as_string(), msg.as_string()) + def test_content_filters_are_deleted_when_mailing_list_is_deleted(self): + # When a mailing list with content filters is deleted, the filters must + # be deleted fist or an IntegrityError will be raised + filter_names = ("filter_types", "pass_types", + "filter_extensions", "pass_extensions") + for fname in filter_names: + setattr(self._ant, fname, ["test-filter-1", "test-filter-2"]) + getUtility(IListManager).delete(self._ant) + store = Store.of(self._ant) + filters = store.find(ContentFilter, + ContentFilter.mailing_list == self._ant) + self.assertEqual(filters.count(), 0) + class TestListCreation(unittest.TestCase): diff --git a/src/mailman/utilities/importer.py b/src/mailman/utilities/importer.py index 21a1e2f09..333cbd1bd 100644 --- a/src/mailman/utilities/importer.py +++ b/src/mailman/utilities/importer.py @@ -22,45 +22,183 @@ from __future__ import absolute_import, print_function, unicode_literals __metaclass__ = type __all__ = [ 'import_config_pck', + 'ImportError', ] import sys import datetime +import os +from urllib2 import URLError -from mailman.interfaces.action import FilterAction -from mailman.interfaces.archiver import ArchivePolicy +from mailman.config import config +from mailman.core.errors import MailmanError +from mailman.interfaces.action import FilterAction, Action from mailman.interfaces.autorespond import ResponseAction from mailman.interfaces.digests import DigestFrequency from mailman.interfaces.mailinglist import Personalization, ReplyToMunging from mailman.interfaces.nntp import NewsgroupModeration +from mailman.interfaces.archiver import ArchivePolicy +from mailman.interfaces.bans import IBanManager +from mailman.interfaces.mailinglist import IAcceptableAliasSet +from mailman.interfaces.bounce import UnrecognizedBounceDisposition +from mailman.interfaces.usermanager import IUserManager +from mailman.interfaces.member import DeliveryMode, DeliveryStatus, MemberRole +from mailman.interfaces.languages import ILanguageManager +from mailman.handlers.decorate import decorate, decorate_template +from mailman.utilities.i18n import search +from zope.component import getUtility + + + +class Import21Error(MailmanError): + pass + +def str_to_unicode(value): + # Convert a string to unicode when the encoding is not declared + if isinstance(value, unicode): + return value + for encoding in ("ascii", "utf-8"): + try: + return unicode(value, encoding) + except UnicodeDecodeError, e: + continue + # we did our best, use replace + return unicode(value, 'ascii', 'replace') def seconds_to_delta(value): return datetime.timedelta(seconds=value) + +def days_to_delta(value): + return datetime.timedelta(days=value) + + +def list_members_to_unicode(value): + return [ unicode(item) for item in value ] + + +def filter_action_mapping(value): + # The filter_action enum values have changed. In Mailman 2.1 the order was + # 'Discard', 'Reject', 'Forward to List Owner', 'Preserve'. + # In 3.0 it's 'hold', 'reject', 'discard', 'accept', 'defer', 'forward', + # 'preserve' + if value == 0: + return FilterAction.discard + elif value == 1: + return FilterAction.reject + elif value == 2: + return FilterAction.forward + elif value == 3: + return FilterAction.preserve + else: + raise ValueError("Unknown filter_action value: %s" % value) + + +def member_action_mapping(value): + # The mlist.default_member_action and mlist.default_nonmember_action enum + # values are different in Mailman 2.1, because they have been merged into a + # single enum in Mailman 3 + # For default_member_action, which used to be called + # member_moderation_action, the values were: + # 0==Hold, 1=Reject, 2==Discard + if value == 0: + return Action.hold + elif value == 1: + return Action.reject + elif value == 2: + return Action.discard +def nonmember_action_mapping(value): + # For default_nonmember_action, which used to be called + # generic_nonmember_action, the values were: + # 0==Accept, 1==Hold, 2==Reject, 3==Discard + if value == 0: + return Action.accept + elif value == 1: + return Action.hold + elif value == 2: + return Action.reject + elif value == 3: + return Action.discard + + +def unicode_to_string(value): + return str(value) if value is not None else None + + +def check_language_code(code): + if code is None: + return None + code = unicode(code) + if code not in getUtility(ILanguageManager): + msg = """Missing language: {0} +You must add a section describing this language in your mailman.cfg file. +This section should look like this: +[language.{0}] +# The English name for the language. +description: CHANGE ME +# And the default character set for the language. +charset: utf-8 +# Whether the language is enabled or not. +enabled: yes +""".format(code) + raise Import21Error(msg) + return code + # Attributes in Mailman 2 which have a different type in Mailman 3. TYPES = dict( autorespond_owner=ResponseAction, autorespond_postings=ResponseAction, autorespond_requests=ResponseAction, + autoresponse_grace_period=days_to_delta, bounce_info_stale_after=seconds_to_delta, bounce_you_are_disabled_warnings_interval=seconds_to_delta, digest_volume_frequency=DigestFrequency, - filter_action=FilterAction, + filter_action=filter_action_mapping, newsgroup_moderation=NewsgroupModeration, personalize=Personalization, reply_goes_to_list=ReplyToMunging, + filter_types=list_members_to_unicode, + pass_types=list_members_to_unicode, + filter_extensions=list_members_to_unicode, + pass_extensions=list_members_to_unicode, + forward_unrecognized_bounces_to=UnrecognizedBounceDisposition, + default_member_action=member_action_mapping, + default_nonmember_action=nonmember_action_mapping, + moderator_password=unicode_to_string, + preferred_language=check_language_code, ) # Attribute names in Mailman 2 which are renamed in Mailman 3. NAME_MAPPINGS = dict( - host_name='mail_host', include_list_post_header='allow_list_posts', real_name='display_name', + last_post_time='last_post_at', + autoresponse_graceperiod='autoresponse_grace_period', + autorespond_admin='autorespond_owner', + autoresponse_admin_text='autoresponse_owner_text', + filter_mime_types='filter_types', + pass_mime_types='pass_types', + filter_filename_extensions='filter_extensions', + pass_filename_extensions='pass_extensions', + bounce_processing='process_bounces', + bounce_unrecognized_goes_to_list_owner='forward_unrecognized_bounces_to', + mod_password='moderator_password', + news_moderation='newsgroup_moderation', + news_prefix_subject_too='nntp_prefix_subject_too', + send_welcome_msg='send_welcome_message', + send_goodbye_msg='send_goodbye_message', + member_moderation_action='default_member_action', + generic_nonmember_action='default_nonmember_action', + ) + +EXCLUDES = ( + "members", + "digest_members", ) @@ -74,14 +212,19 @@ def import_config_pck(mlist, config_dict): :type config_dict: dict """ for key, value in config_dict.items(): + # Some attributes must not be directly imported + if key in EXCLUDES: + continue # Some attributes from Mailman 2 were renamed in Mailman 3. key = NAME_MAPPINGS.get(key, key) # Handle the simple case where the key is an attribute of the # IMailingList and the types are the same (modulo 8-bit/unicode # strings). - if hasattr(mlist, key): + # When attributes raise an exception, hasattr may think they don't + # exist (see python issue 9666). Add them here. + if hasattr(mlist, key) or key in ("preferred_language", ): if isinstance(value, str): - value = unicode(value, 'ascii') + value = str_to_unicode(value) # Some types require conversion. converter = TYPES.get(key) if converter is not None: @@ -103,3 +246,184 @@ def import_config_pck(mlist, config_dict): mlist.archive_policy = ArchivePolicy.public else: mlist.archive_policy = ArchivePolicy.never + # Handle ban list + for addr in config_dict.get('ban_list', []): + IBanManager(mlist).ban(str_to_unicode(addr)) + # Handle acceptable aliases + acceptable_aliases = config_dict.get('acceptable_aliases', '') + if isinstance(acceptable_aliases, basestring): + acceptable_aliases = acceptable_aliases.splitlines() + for addr in acceptable_aliases: + addr = addr.strip() + if not addr: + continue + addr = str_to_unicode(addr) + try: + IAcceptableAliasSet(mlist).add(addr) + except ValueError: + IAcceptableAliasSet(mlist).add("^" + addr) + # Handle conversion to URIs + convert_to_uri = { + "welcome_msg": "welcome_message_uri", + "goodbye_msg": "goodbye_message_uri", + "msg_header": "header_uri", + "msg_footer": "footer_uri", + "digest_header": "digest_header_uri", + "digest_footer": "digest_footer_uri", + } + convert_placeholders = { # only the most common ones + "%(real_name)s": "$display_name", + "%(real_name)s@%(host_name)s": "$fqdn_listname", + "%(web_page_url)slistinfo%(cgiext)s/%(_internal_name)s": "$listinfo_uri", + } + # Collect defaults + defaults = {} + for oldvar, newvar in convert_to_uri.iteritems(): + default_value = getattr(mlist, newvar) + if not default_value: + continue + # Check if the value changed from the default + try: + default_text = decorate(mlist, default_value) + except (URLError, KeyError): + # Use case: importing the old a@ex.com into b@ex.com + # We can't check if it changed from the default + # -> don't import, we may do more harm than good and it's easy to + # change if needed + continue + defaults[newvar] = (default_value, default_text) + for oldvar, newvar in convert_to_uri.iteritems(): + if oldvar not in config_dict: + continue + text = config_dict[oldvar] + text = unicode(text, "utf-8", "replace") + for oldph, newph in convert_placeholders.iteritems(): + text = text.replace(oldph, newph) + default_value, default_text = defaults.get(newvar, (None, None)) + if not text and not (default_value or default_text): + continue # both are empty, leave it + # Check if the value changed from the default + try: + expanded_text = decorate_template(mlist, text) + except KeyError: + # Use case: importing the old a@ex.com into b@ex.com + # We can't check if it changed from the default + # -> don't import, we may do more harm than good and it's easy to + # change if needed + continue + if expanded_text and default_text \ + and expanded_text.strip() == default_text.strip(): + continue # keep the default + # Write the custom value to the right file + base_uri = "mailman:///$listname/$language/" + if default_value: + filename = default_value.rpartition("/")[2] + else: + filename = "%s.txt" % newvar[:-4] + if not default_value or not default_value.startswith(base_uri): + setattr(mlist, newvar, base_uri + filename) + filepath = list(search(filename, mlist))[0] + try: + os.makedirs(os.path.dirname(filepath)) + except OSError, e: + if e.errno != 17: # Already exists + raise + with open(filepath, "w") as template: + template.write(text.encode('utf-8')) + # Import rosters + members = set(config_dict.get("members", {}).keys() + + config_dict.get("digest_members", {}).keys()) + import_roster(mlist, config_dict, members, MemberRole.member) + import_roster(mlist, config_dict, config_dict.get("owner", []), + MemberRole.owner) + import_roster(mlist, config_dict, config_dict.get("moderator", []), + MemberRole.moderator) + + + +def import_roster(mlist, config_dict, members, role): + """ + Import members lists from a config.pck configuration dictionary to a + mailing list. + + :param mlist: The mailing list. + :type mlist: IMailingList + :param config_dict: The Mailman 2.1 configuration dictionary. + :type config_dict: dict + :param members: The members list to import + :type members: list + :param role: The MemberRole to import them as + :type role: MemberRole enum + """ + usermanager = getUtility(IUserManager) + for email in members: + # for owners and members, the emails can have a mixed case, so + # lowercase them all + email = str_to_unicode(email).lower() + roster = mlist.get_roster(role) + if roster.get_member(email) is not None: + print("%s is already imported with role %s" % (email, role), + file=sys.stderr) + continue + address = usermanager.get_address(email) + user = usermanager.get_user(email) + if user is None: + user = usermanager.create_user() + if address is None: + merged_members = {} + merged_members.update(config_dict.get("members", {})) + merged_members.update(config_dict.get("digest_members", {})) + if merged_members.get(email, 0) != 0: + original_email = str_to_unicode(merged_members[email]) + else: + original_email = email + address = usermanager.create_address(original_email) + address.verified_on = datetime.datetime.now() + user.link(address) + mlist.subscribe(address, role) + member = roster.get_member(email) + assert member is not None + prefs = config_dict.get("user_options", {}).get(email, 0) + if email in config_dict.get("members", {}): + member.preferences.delivery_mode = DeliveryMode.regular + elif email in config_dict.get("digest_members", {}): + if prefs & 8: # DisableMime + member.preferences.delivery_mode = DeliveryMode.plaintext_digests + else: + member.preferences.delivery_mode = DeliveryMode.mime_digests + else: + # probably not adding a member role here + pass + if email in config_dict.get("language", {}): + member.preferences.preferred_language = \ + check_language_code(config_dict["language"][email]) + # if the user already exists, display_name and password will be + # overwritten + if email in config_dict.get("usernames", {}): + address.display_name = \ + str_to_unicode(config_dict["usernames"][email]) + user.display_name = \ + str_to_unicode(config_dict["usernames"][email]) + if email in config_dict.get("passwords", {}): + user.password = config.password_context.encrypt( + config_dict["passwords"][email]) + # delivery_status + oldds = config_dict.get("delivery_status", {}).get(email, (0, 0))[0] + if oldds == 0: + member.preferences.delivery_status = DeliveryStatus.enabled + elif oldds == 1: + member.preferences.delivery_status = DeliveryStatus.unknown + elif oldds == 2: + member.preferences.delivery_status = DeliveryStatus.by_user + elif oldds == 3: + member.preferences.delivery_status = DeliveryStatus.by_moderator + elif oldds == 4: + member.preferences.delivery_status = DeliveryStatus.by_bounces + # moderation + if prefs & 128: + member.moderation_action = Action.hold + # other preferences + member.preferences.acknowledge_posts = bool(prefs & 4) # AcknowledgePosts + member.preferences.hide_address = bool(prefs & 16) # ConcealSubscription + member.preferences.receive_own_postings = not bool(prefs & 2) # DontReceiveOwnPosts + member.preferences.receive_list_copy = not bool(prefs & 256) # DontReceiveDuplicates diff --git a/src/mailman/utilities/tests/test_import.py b/src/mailman/utilities/tests/test_import.py index b64da7501..34ff7eb3c 100644 --- a/src/mailman/utilities/tests/test_import.py +++ b/src/mailman/utilities/tests/test_import.py @@ -26,15 +26,42 @@ __all__ = [ ] +import os import cPickle import unittest +from datetime import timedelta, datetime +from traceback import format_exc +from mailman.config import config from mailman.app.lifecycle import create_list, remove_list from mailman.interfaces.archiver import ArchivePolicy from mailman.testing.layers import ConfigLayer -from mailman.utilities.importer import import_config_pck +from mailman.utilities.importer import import_config_pck, Import21Error +from mailman.interfaces.archiver import ArchivePolicy +from mailman.interfaces.action import Action, FilterAction +from mailman.interfaces.address import ExistingAddressError +from mailman.interfaces.bounce import UnrecognizedBounceDisposition +from mailman.interfaces.bans import IBanManager +from mailman.interfaces.mailinglist import IAcceptableAliasSet +from mailman.interfaces.nntp import NewsgroupModeration +from mailman.interfaces.autorespond import ResponseAction +from mailman.interfaces.templates import ITemplateLoader +from mailman.interfaces.usermanager import IUserManager +from mailman.interfaces.member import DeliveryMode, DeliveryStatus, MemberRole +from mailman.interfaces.languages import ILanguageManager +from mailman.model.address import Address +from mailman.handlers.decorate import decorate +from mailman.utilities.string import expand from pkg_resources import resource_filename +from enum import Enum +from zope.component import getUtility +from storm.locals import Store + + +class DummyEnum(Enum): + # For testing purposes + val = 42 class TestBasicImport(unittest.TestCase): @@ -58,11 +85,12 @@ class TestBasicImport(unittest.TestCase): self._import() self.assertEqual(self._mlist.display_name, 'Test') - def test_mail_host(self): - # The mlist.mail_host gets set. + def test_mail_host_invariant(self): + # The mlist.mail_host must not be updated when importing (it will + # change the list_id property, which is supposed to be read-only) self.assertEqual(self._mlist.mail_host, 'example.com') self._import() - self.assertEqual(self._mlist.mail_host, 'heresy.example.org') + self.assertEqual(self._mlist.mail_host, 'example.com') def test_rfc2369_headers(self): self._mlist.allow_list_posts = False @@ -71,6 +99,204 @@ class TestBasicImport(unittest.TestCase): self.assertTrue(self._mlist.allow_list_posts) self.assertTrue(self._mlist.include_rfc2369_headers) + def test_no_overwrite_rosters(self): + # The mlist.members and mlist.digest_members rosters must not be + # overwritten. + for rname in ("members", "digest_members"): + roster = getattr(self._mlist, rname) + self.assertFalse(isinstance(roster, dict)) + self._import() + self.assertFalse(isinstance(roster, dict), + "The %s roster has been overwritten by the import" % rname) + + def test_last_post_time(self): + # last_post_time -> last_post_at + self._pckdict["last_post_time"] = 1270420800.274485 + self.assertEqual(self._mlist.last_post_at, None) + self._import() + # convert 1270420800.2744851 to datetime + expected = datetime(2010, 4, 4, 22, 40, 0, 274485) + self.assertEqual(self._mlist.last_post_at, expected) + + def test_autoresponse_grace_period(self): + # autoresponse_graceperiod -> autoresponse_grace_period + # must be a timedelta, not an int + self._mlist.autoresponse_grace_period = timedelta(days=42) + self._import() + self.assertTrue(isinstance( + self._mlist.autoresponse_grace_period, timedelta)) + self.assertEqual(self._mlist.autoresponse_grace_period, + timedelta(days=90)) + + def test_autoresponse_admin_to_owner(self): + # admin -> owner + self._mlist.autorespond_owner = DummyEnum.val + self._mlist.autoresponse_owner_text = 'DUMMY' + self._import() + self.assertEqual(self._mlist.autorespond_owner, ResponseAction.none) + self.assertEqual(self._mlist.autoresponse_owner_text, '') + + #def test_administrative(self): + # # administrivia -> administrative + # self._mlist.administrative = None + # self._import() + # self.assertTrue(self._mlist.administrative) + + def test_filter_pass_renames(self): + # mime_types -> types + # filename_extensions -> extensions + self._mlist.filter_types = ["dummy"] + self._mlist.pass_types = ["dummy"] + self._mlist.filter_extensions = ["dummy"] + self._mlist.pass_extensions = ["dummy"] + self._import() + self.assertEqual(list(self._mlist.filter_types), []) + self.assertEqual(list(self._mlist.filter_extensions), + ['exe', 'bat', 'cmd', 'com', 'pif', + 'scr', 'vbs', 'cpl']) + self.assertEqual(list(self._mlist.pass_types), + ['multipart/mixed', 'multipart/alternative', 'text/plain']) + self.assertEqual(list(self._mlist.pass_extensions), []) + + def test_process_bounces(self): + # bounce_processing -> process_bounces + self._mlist.process_bounces = None + self._import() + self.assertTrue(self._mlist.process_bounces) + + def test_forward_unrecognized_bounces_to(self): + # bounce_unrecognized_goes_to_list_owner -> forward_unrecognized_bounces_to + self._mlist.forward_unrecognized_bounces_to = DummyEnum.val + self._import() + self.assertEqual(self._mlist.forward_unrecognized_bounces_to, + UnrecognizedBounceDisposition.administrators) + + def test_moderator_password(self): + # mod_password -> moderator_password + self._mlist.moderator_password = str("TESTDATA") + self._import() + self.assertEqual(self._mlist.moderator_password, None) + + def test_moderator_password_str(self): + # moderator_password must not be unicode + self._pckdict[b"mod_password"] = b'TESTVALUE' + self._import() + self.assertFalse(isinstance(self._mlist.moderator_password, unicode)) + self.assertEqual(self._mlist.moderator_password, b'TESTVALUE') + + def test_newsgroup_moderation(self): + # news_moderation -> newsgroup_moderation + # news_prefix_subject_too -> nntp_prefix_subject_too + self._mlist.newsgroup_moderation = DummyEnum.val + self._mlist.nntp_prefix_subject_too = None + self._import() + self.assertEqual(self._mlist.newsgroup_moderation, + NewsgroupModeration.none) + self.assertTrue(self._mlist.nntp_prefix_subject_too) + + def test_msg_to_message(self): + # send_welcome_msg -> send_welcome_message + # send_goodbye_msg -> send_goodbye_message + self._mlist.send_welcome_message = None + self._mlist.send_goodbye_message = None + self._import() + self.assertTrue(self._mlist.send_welcome_message) + self.assertTrue(self._mlist.send_goodbye_message) + + def test_ban_list(self): + banned = [ + ("anne@example.com", "anne@example.com"), + ("^.*@example.com", "bob@example.com"), + ("non-ascii-\xe8@example.com", "non-ascii-\ufffd@example.com"), + ] + self._pckdict["ban_list"] = [ b[0].encode("iso-8859-1") for b in banned ] + try: + self._import() + except UnicodeDecodeError, e: + print(format_exc()) + self.fail(e) + for _pattern, addr in banned: + self.assertTrue(IBanManager(self._mlist).is_banned(addr)) + + def test_acceptable_aliases(self): + # it used to be a plain-text field (values are newline-separated) + aliases = ["alias1@example.com", + "alias2@exemple.com", + "non-ascii-\xe8@example.com", + ] + self._pckdict[b"acceptable_aliases"] = \ + ("\n".join(aliases)).encode("utf-8") + self._import() + alias_set = IAcceptableAliasSet(self._mlist) + self.assertEqual(sorted(alias_set.aliases), aliases) + + def test_acceptable_aliases_invalid(self): + # values without an '@' sign used to be matched against the local part, + # now we need to add the '^' sign + aliases = ["invalid-value", ] + self._pckdict[b"acceptable_aliases"] = \ + ("\n".join(aliases)).encode("utf-8") + try: + self._import() + except ValueError, e: + print(format_exc()) + self.fail("Invalid value '%s' caused a crash" % e) + alias_set = IAcceptableAliasSet(self._mlist) + self.assertEqual(sorted(alias_set.aliases), + [ ("^" + a) for a in aliases ]) + + def test_acceptable_aliases_as_list(self): + # in some versions of the pickle, it can be a list, not a string + # (seen in the wild) + aliases = [b"alias1@example.com", b"alias2@exemple.com" ] + self._pckdict[b"acceptable_aliases"] = aliases + try: + self._import() + except AttributeError: + print(format_exc()) + self.fail("Import does not handle acceptable_aliases as list") + alias_set = IAcceptableAliasSet(self._mlist) + self.assertEqual(sorted(alias_set.aliases), aliases) + + def test_info_non_ascii(self): + # info can contain non-ascii chars + info = 'O idioma aceito \xe9 somente Portugu\xeas do Brasil' + self._pckdict[b"info"] = info.encode("utf-8") + self._import() + self.assertEqual(self._mlist.info, info, + "Encoding to UTF-8 is not handled") + # test fallback to ascii with replace + self._pckdict[b"info"] = info.encode("iso-8859-1") + self._import() + self.assertEqual(self._mlist.info, + unicode(self._pckdict[b"info"], "ascii", "replace"), + "We don't fall back to replacing non-ascii chars") + + def test_preferred_language(self): + self._pckdict[b"preferred_language"] = b'ja' + english = getUtility(ILanguageManager).get('en') + japanese = getUtility(ILanguageManager).get('ja') + self.assertEqual(self._mlist.preferred_language, english) + self._import() + self.assertEqual(self._mlist.preferred_language, japanese) + + def test_preferred_language_unknown_previous(self): + # when the previous language is unknown, it should not fail + self._mlist._preferred_language = 'xx' # non-existant + self._import() + english = getUtility(ILanguageManager).get('en') + self.assertEqual(self._mlist.preferred_language, english) + + def test_new_language(self): + self._pckdict[b"preferred_language"] = b'xx_XX' + try: + self._import() + except Import21Error, e: + # check the message + self.assertTrue("[language.xx_XX]" in str(e)) + else: + self.fail("Import21Error was not raised") + class TestArchiveImport(unittest.TestCase): @@ -83,7 +309,10 @@ class TestArchiveImport(unittest.TestCase): def setUp(self): self._mlist = create_list('blank@example.com') - self._mlist.archive_policy = 'INITIAL-TEST-VALUE' + self._mlist.archive_policy = DummyEnum.val + + def tearDown(self): + remove_list(self._mlist) def _do_test(self, pckdict, expected): import_config_pck(self._mlist, pckdict) @@ -123,3 +352,516 @@ class TestArchiveImport(unittest.TestCase): # For some reason, the old list was missing an `archive_private` key. # For maximum safety, we treat this as private archiving. self._do_test(dict(archive=True), ArchivePolicy.private) + + + +class TestFilterActionImport(unittest.TestCase): + # The mlist.filter_action enum values have changed. In Mailman 2.1 the + # order was 'Discard', 'Reject', 'Forward to List Owner', 'Preserve'. + + layer = ConfigLayer + + def setUp(self): + self._mlist = create_list('blank@example.com') + self._mlist.filter_action = DummyEnum.val + + def tearDown(self): + remove_list(self._mlist) + + def _do_test(self, original, expected): + import_config_pck(self._mlist, dict(filter_action=original)) + self.assertEqual(self._mlist.filter_action, expected) + + def test_discard(self): + self._do_test(0, FilterAction.discard) + + def test_reject(self): + self._do_test(1, FilterAction.reject) + + def test_forward(self): + self._do_test(2, FilterAction.forward) + + def test_preserve(self): + self._do_test(3, FilterAction.preserve) + + + +class TestMemberActionImport(unittest.TestCase): + # The mlist.default_member_action and mlist.default_nonmember_action enum + # values are different in Mailman 2.1, they have been merged into a + # single enum in Mailman 3 + # For default_member_action, which used to be called + # member_moderation_action, the values were: + # 0==Hold, 1=Reject, 2==Discard + # For default_nonmember_action, which used to be called + # generic_nonmember_action, the values were: + # 0==Accept, 1==Hold, 2==Reject, 3==Discard + + layer = ConfigLayer + + def setUp(self): + self._mlist = create_list('blank@example.com') + self._mlist.default_member_action = DummyEnum.val + self._mlist.default_nonmember_action = DummyEnum.val + self._pckdict = dict( + member_moderation_action=DummyEnum.val, + generic_nonmember_action=DummyEnum.val, + ) + + def tearDown(self): + remove_list(self._mlist) + + def _do_test(self, expected): + import_config_pck(self._mlist, self._pckdict) + for key, value in expected.iteritems(): + self.assertEqual(getattr(self._mlist, key), value) + + def test_member_hold(self): + self._pckdict[b"member_moderation_action"] = 0 + self._do_test(dict(default_member_action=Action.hold)) + + def test_member_reject(self): + self._pckdict[b"member_moderation_action"] = 1 + self._do_test(dict(default_member_action=Action.reject)) + + def test_member_discard(self): + self._pckdict[b"member_moderation_action"] = 2 + self._do_test(dict(default_member_action=Action.discard)) + + def test_nonmember_accept(self): + self._pckdict[b"generic_nonmember_action"] = 0 + self._do_test(dict(default_nonmember_action=Action.accept)) + + def test_nonmember_hold(self): + self._pckdict[b"generic_nonmember_action"] = 1 + self._do_test(dict(default_nonmember_action=Action.hold)) + + def test_nonmember_reject(self): + self._pckdict[b"generic_nonmember_action"] = 2 + self._do_test(dict(default_nonmember_action=Action.reject)) + + def test_nonmember_discard(self): + self._pckdict[b"generic_nonmember_action"] = 3 + self._do_test(dict(default_nonmember_action=Action.discard)) + + + +class TestConvertToURI(unittest.TestCase): + # The following values were plain text, and are now URIs in Mailman 3: + # - welcome_message_uri + # - goodbye_message_uri + # - header_uri + # - footer_uri + # - digest_header_uri + # - digest_footer_uri + # + # The templates contain variables that must be replaced: + # - %(real_name)s -> %(display_name)s + # - %(real_name)s@%(host_name)s -> %(fqdn_listname)s + # - %(web_page_url)slistinfo%(cgiext)s/%(_internal_name)s -> %(listinfo_uri)s + + layer = ConfigLayer + + def setUp(self): + self._mlist = create_list('blank@example.com') + self._conf_mapping = dict( + welcome_msg="welcome_message_uri", + goodbye_msg="goodbye_message_uri", + msg_header="header_uri", + msg_footer="footer_uri", + digest_header="digest_header_uri", + digest_footer="digest_footer_uri", + ) + self._pckdict = dict() + #self._pckdict = { + # "preferred_language": "XX", # templates are lang-specific + #} + + def tearDown(self): + remove_list(self._mlist) + + def test_text_to_uri(self): + for oldvar, newvar in self._conf_mapping.iteritems(): + self._pckdict[str(oldvar)] = b"TEST VALUE" + import_config_pck(self._mlist, self._pckdict) + newattr = getattr(self._mlist, newvar) + text = decorate(self._mlist, newattr) + self.assertEqual(text, "TEST VALUE", + "Old variable %s was not properly imported to %s" + % (oldvar, newvar)) + + def test_substitutions(self): + test_text = ("UNIT TESTING %(real_name)s mailing list\n" + "%(real_name)s@%(host_name)s\n" + "%(web_page_url)slistinfo%(cgiext)s/%(_internal_name)s") + expected_text = ("UNIT TESTING $display_name mailing list\n" + "$fqdn_listname\n" + "$listinfo_uri") + for oldvar, newvar in self._conf_mapping.iteritems(): + self._pckdict[str(oldvar)] = str(test_text) + import_config_pck(self._mlist, self._pckdict) + newattr = getattr(self._mlist, newvar) + template_uri = expand(newattr, dict( + listname=self._mlist.fqdn_listname, + language=self._mlist.preferred_language.code, + )) + loader = getUtility(ITemplateLoader) + text = loader.get(template_uri) + self.assertEqual(text, expected_text, + "Old variables were not converted for %s" % newvar) + + def test_keep_default(self): + # If the value was not changed from MM2.1's default, don't import it + default_msg_footer = ( + "_______________________________________________\n" + "%(real_name)s mailing list\n" + "%(real_name)s@%(host_name)s\n" + "%(web_page_url)slistinfo%(cgiext)s/%(_internal_name)s" + ) + for oldvar in ("msg_footer", "digest_footer"): + newvar = self._conf_mapping[oldvar] + self._pckdict[str(oldvar)] = str(default_msg_footer) + old_value = getattr(self._mlist, newvar) + import_config_pck(self._mlist, self._pckdict) + new_value = getattr(self._mlist, newvar) + self.assertEqual(old_value, new_value, + "Default value was not preserved for %s" % newvar) + + def test_keep_default_if_fqdn_changed(self): + # Use case: importing the old a@ex.com into b@ex.com + # We can't check if it changed from the default + # -> don't import, we may do more harm than good and it's easy to + # change if needed + test_value = b"TEST-VALUE" + for oldvar, newvar in self._conf_mapping.iteritems(): + self._mlist.mail_host = "example.com" + self._pckdict[b"mail_host"] = b"test.example.com" + self._pckdict[str(oldvar)] = test_value + old_value = getattr(self._mlist, newvar) + import_config_pck(self._mlist, self._pckdict) + new_value = getattr(self._mlist, newvar) + self.assertEqual(old_value, new_value, + "Default value was not preserved for %s" % newvar) + + def test_unicode(self): + # non-ascii templates + for oldvar in self._conf_mapping: + self._pckdict[str(oldvar)] = b"Ol\xe1!" + try: + import_config_pck(self._mlist, self._pckdict) + except UnicodeDecodeError, e: + print(format_exc()) + self.fail(e) + for oldvar, newvar in self._conf_mapping.iteritems(): + newattr = getattr(self._mlist, newvar) + text = decorate(self._mlist, newattr) + expected = u'Ol\ufffd!' + self.assertEqual(text, expected) + + def test_unicode_in_default(self): + # What if the default template is already in UTF-8? (like if you import twice) + footer = b'\xe4\xb8\xad $listinfo_uri' + footer_path = os.path.join(config.VAR_DIR, "templates", "lists", + "blank@example.com", "en", "footer-generic.txt") + try: + os.makedirs(os.path.dirname(footer_path)) + except OSError: + pass + with open(footer_path, "w") as footer_file: + footer_file.write(footer) + self._pckdict[b"msg_footer"] = b"NEW-VALUE" + import_config_pck(self._mlist, self._pckdict) + text = decorate(self._mlist, self._mlist.footer_uri) + self.assertEqual(text, 'NEW-VALUE') + + +class TestRosterImport(unittest.TestCase): + + layer = ConfigLayer + + def setUp(self): + self._mlist = create_list('blank@example.com') + self._pckdict = { + "members": { + "anne@example.com": 0, + "bob@example.com": b"bob@ExampLe.Com", + }, + "digest_members": { + "cindy@example.com": 0, + "dave@example.com": b"dave@ExampLe.Com", + }, + "passwords": { + "anne@example.com" : b"annepass", + "bob@example.com" : b"bobpass", + "cindy@example.com": b"cindypass", + "dave@example.com" : b"davepass", + }, + "language": { + "anne@example.com" : b"fr", + "bob@example.com" : b"de", + "cindy@example.com": b"es", + "dave@example.com" : b"it", + }, + "usernames": { # Usernames are unicode strings in the pickle + "anne@example.com" : "Anne", + "bob@example.com" : "Bob", + "cindy@example.com": "Cindy", + "dave@example.com" : "Dave", + }, + "owner": [ + "anne@example.com", + "emily@example.com", + ], + "moderator": [ + "bob@example.com", + "fred@example.com", + ], + } + self._usermanager = getUtility(IUserManager) + language_manager = getUtility(ILanguageManager) + for code in self._pckdict["language"].values(): + if code not in language_manager.codes: + language_manager.add(code, 'utf-8', code) + + def tearDown(self): + remove_list(self._mlist) + + def test_member(self): + import_config_pck(self._mlist, self._pckdict) + for name in ("anne", "bob", "cindy", "dave"): + addr = "%s@example.com" % name + self.assertTrue( + addr in [ a.email for a in self._mlist.members.addresses], + "Address %s was not imported" % addr) + self.assertTrue("anne@example.com" in [ a.email + for a in self._mlist.regular_members.addresses]) + self.assertTrue("bob@example.com" in [ a.email + for a in self._mlist.regular_members.addresses]) + self.assertTrue("cindy@example.com" in [ a.email + for a in self._mlist.digest_members.addresses]) + self.assertTrue("dave@example.com" in [ a.email + for a in self._mlist.digest_members.addresses]) + + def test_original_email(self): + import_config_pck(self._mlist, self._pckdict) + bob = self._usermanager.get_address("bob@example.com") + self.assertEqual(bob.original_email, "bob@ExampLe.Com") + dave = self._usermanager.get_address("dave@example.com") + self.assertEqual(dave.original_email, "dave@ExampLe.Com") + + def test_language(self): + import_config_pck(self._mlist, self._pckdict) + for name in ("anne", "bob", "cindy", "dave"): + addr = "%s@example.com" % name + member = self._mlist.members.get_member(addr) + self.assertTrue(member is not None, + "Address %s was not imported" % addr) + print(self._pckdict["language"]) + print(member.preferred_language, member.preferred_language.code) + self.assertEqual(member.preferred_language.code, + self._pckdict["language"][addr]) + + def test_new_language(self): + self._pckdict[b"language"]["anne@example.com"] = b'xx_XX' + try: + import_config_pck(self._mlist, self._pckdict) + except Import21Error, e: + # check the message + self.assertTrue("[language.xx_XX]" in str(e)) + else: + self.fail("Import21Error was not raised") + + def test_username(self): + import_config_pck(self._mlist, self._pckdict) + for name in ("anne", "bob", "cindy", "dave"): + addr = "%s@example.com" % name + user = self._usermanager.get_user(addr) + address = self._usermanager.get_address(addr) + self.assertTrue(user is not None, + "User %s was not imported" % addr) + self.assertTrue(address is not None, + "Address %s was not imported" % addr) + display_name = self._pckdict["usernames"][addr] + self.assertEqual(user.display_name, display_name, + "The display name was not set for User %s" % addr) + self.assertEqual(address.display_name, display_name, + "The display name was not set for Address %s" % addr) + + def test_owner(self): + import_config_pck(self._mlist, self._pckdict) + for name in ("anne", "emily"): + addr = "%s@example.com" % name + self.assertTrue( + addr in [ a.email for a in self._mlist.owners.addresses ], + "Address %s was not imported as owner" % addr) + self.assertFalse("emily@example.com" in + [ a.email for a in self._mlist.members.addresses ], + "Address emily@ was wrongly added to the members list") + + def test_moderator(self): + import_config_pck(self._mlist, self._pckdict) + for name in ("bob", "fred"): + addr = "%s@example.com" % name + self.assertTrue( + addr in [ a.email for a in self._mlist.moderators.addresses ], + "Address %s was not imported as moderator" % addr) + self.assertFalse("fred@example.com" in + [ a.email for a in self._mlist.members.addresses ], + "Address fred@ was wrongly added to the members list") + + def test_password(self): + #self.anne.password = config.password_context.encrypt('abc123') + import_config_pck(self._mlist, self._pckdict) + for name in ("anne", "bob", "cindy", "dave"): + addr = "%s@example.com" % name + user = self._usermanager.get_user(addr) + self.assertTrue(user is not None, + "Address %s was not imported" % addr) + self.assertEqual(user.password, b'{plaintext}%spass' % name, + "Password for %s was not imported" % addr) + + def test_same_user(self): + # Adding the address of an existing User must not create another user + user = self._usermanager.create_user('anne@example.com', 'Anne') + user.register("bob@example.com") # secondary email + import_config_pck(self._mlist, self._pckdict) + member = self._mlist.members.get_member('bob@example.com') + self.assertEqual(member.user, user) + + def test_owner_and_moderator_not_lowercase(self): + # In the v2.1 pickled dict, the owner and moderator lists are not + # necessarily lowercased already + self._pckdict["owner"] = [b"Anne@example.com"] + self._pckdict["moderator"] = [b"Anne@example.com"] + try: + import_config_pck(self._mlist, self._pckdict) + except AssertionError: + print(format_exc()) + self.fail("The address was not lowercased") + self.assertTrue("anne@example.com" in + [ a.email for a in self._mlist.owners.addresses ]) + self.assertTrue("anne@example.com" in + [ a.email for a in self._mlist.moderators.addresses]) + + def test_address_already_exists_but_no_user(self): + # An address already exists, but it is not linked to a user nor + # subscribed + anne_addr = Address("anne@example.com", "Anne") + Store.of(self._mlist).add(anne_addr) + try: + import_config_pck(self._mlist, self._pckdict) + except ExistingAddressError: + print(format_exc()) + self.fail("existing address was not checked") + anne = self._usermanager.get_user("anne@example.com") + self.assertTrue(anne.controls("anne@example.com")) + self.assertTrue(anne_addr in self._mlist.regular_members.addresses) + + def test_address_already_subscribed_but_no_user(self): + # An address is already subscribed, but it is not linked to a user + anne_addr = Address("anne@example.com", "Anne") + self._mlist.subscribe(anne_addr) + try: + import_config_pck(self._mlist, self._pckdict) + except ExistingAddressError: + print(format_exc()) + self.fail("existing address was not checked") + anne = self._usermanager.get_user("anne@example.com") + self.assertTrue(anne.controls("anne@example.com")) + + + + +class TestPreferencesImport(unittest.TestCase): + + layer = ConfigLayer + + def setUp(self): + self._mlist = create_list('blank@example.com') + self._pckdict = dict( + members={ "anne@example.com": 0 }, + user_options=dict(), + delivery_status=dict(), + ) + self._usermanager = getUtility(IUserManager) + + def tearDown(self): + remove_list(self._mlist) + + def _do_test(self, oldvalue, expected): + self._pckdict["user_options"]["anne@example.com"] = oldvalue + import_config_pck(self._mlist, self._pckdict) + user = self._usermanager.get_user("anne@example.com") + self.assertTrue(user is not None, "User was not imported") + member = self._mlist.members.get_member("anne@example.com") + self.assertTrue(member is not None, "Address was not subscribed") + for exp_name, exp_val in expected.iteritems(): + try: + currentval = getattr(member, exp_name) + except AttributeError: + # hide_address has no direct getter + currentval = getattr(member.preferences, exp_name) + self.assertEqual(currentval, exp_val, + "Preference %s was not imported" % exp_name) + # XXX: should I check that other params are still equal to + # mailman.core.constants.system_preferences ? + + def test_acknowledge_posts(self): + # AcknowledgePosts + self._do_test(4, dict(acknowledge_posts=True)) + + def test_hide_address(self): + # ConcealSubscription + self._do_test(16, dict(hide_address=True)) + + def test_receive_own_postings(self): + # DontReceiveOwnPosts + self._do_test(2, dict(receive_own_postings=False)) + + def test_receive_list_copy(self): + # DontReceiveDuplicates + self._do_test(256, dict(receive_list_copy=False)) + + def test_digest_plain(self): + # Digests & DisableMime + self._pckdict["digest_members"] = self._pckdict["members"].copy() + self._pckdict["members"] = dict() + self._do_test(8, dict(delivery_mode=DeliveryMode.plaintext_digests)) + + def test_digest_mime(self): + # Digests & not DisableMime + self._pckdict["digest_members"] = self._pckdict["members"].copy() + self._pckdict["members"] = dict() + self._do_test(0, dict(delivery_mode=DeliveryMode.mime_digests)) + + def test_delivery_status(self): + # look for the pckdict["delivery_status"] key which will look like + # (status, time) where status is among the following: + # ENABLED = 0 # enabled + # UNKNOWN = 1 # legacy disabled + # BYUSER = 2 # disabled by user choice + # BYADMIN = 3 # disabled by admin choice + # BYBOUNCE = 4 # disabled by bounces + for oldval, expected in enumerate((DeliveryStatus.enabled, + DeliveryStatus.unknown, DeliveryStatus.by_user, + DeliveryStatus.by_moderator, DeliveryStatus.by_bounces)): + self._pckdict["delivery_status"]["anne@example.com"] = (oldval, 0) + import_config_pck(self._mlist, self._pckdict) + member = self._mlist.members.get_member("anne@example.com") + self.assertTrue(member is not None, "Address was not subscribed") + self.assertEqual(member.delivery_status, expected) + member.unsubscribe() + + def test_moderate(self): + # Option flag Moderate is translated to + # member.moderation_action = Action.hold + self._do_test(128, dict(moderation_action=Action.hold)) + + def test_multiple_options(self): + # DontReceiveDuplicates & DisableMime & SuppressPasswordReminder + self._pckdict[b"digest_members"] = self._pckdict[b"members"].copy() + self._pckdict[b"members"] = dict() + self._do_test(296, dict( + receive_list_copy=False, + delivery_mode=DeliveryMode.plaintext_digests, + )) |
