summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mailman/handlers/decorate.py9
-rw-r--r--src/mailman/utilities/importer.py252
-rw-r--r--src/mailman/utilities/tests/test_import.py568
3 files changed, 824 insertions, 5 deletions
diff --git a/src/mailman/handlers/decorate.py b/src/mailman/handlers/decorate.py
index 945c50bd5..815a24569 100644
--- a/src/mailman/handlers/decorate.py
+++ b/src/mailman/handlers/decorate.py
@@ -201,8 +201,8 @@ def process(mlist, msg, msgdata):
def decorate(mlist, uri, extradict=None):
- """Expand the decoration template."""
- if uri is None:
+ """Expand the decoration template from its URI."""
+ if uri is None or uri == '':
return ''
# Get the decorator template.
loader = getUtility(ITemplateLoader)
@@ -211,6 +211,11 @@ def decorate(mlist, uri, extradict=None):
language=mlist.preferred_language.code,
))
template = loader.get(template_uri)
+ return decorate_template(mlist, template, extradict)
+
+
+def decorate_template(mlist, template, extradict=None):
+ """Expand the decoration template."""
# Create a dictionary which includes the default set of interpolation
# variables allowed in headers and footers. These will be augmented by
# any key/value pairs in the extradict.
diff --git a/src/mailman/utilities/importer.py b/src/mailman/utilities/importer.py
index 3cc7259d7..50d2e7640 100644
--- a/src/mailman/utilities/importer.py
+++ b/src/mailman/utilities/importer.py
@@ -27,32 +27,103 @@ __all__ = [
import sys
import datetime
+import os
+from urllib2 import URLError
-from mailman.interfaces.action import FilterAction
+from mailman.config import config
+from mailman.interfaces.action import FilterAction, Action
from mailman.interfaces.autorespond import ResponseAction
from mailman.interfaces.digests import DigestFrequency
from mailman.interfaces.mailinglist import Personalization, ReplyToMunging
from mailman.interfaces.nntp import NewsgroupModeration
from mailman.interfaces.archiver import ArchivePolicy
+from mailman.interfaces.bans import IBanManager
+from mailman.interfaces.mailinglist import IAcceptableAliasSet
+from mailman.interfaces.bounce import UnrecognizedBounceDisposition
+from mailman.interfaces.usermanager import IUserManager
+from mailman.interfaces.member import DeliveryMode, DeliveryStatus, MemberRole
+from mailman.handlers.decorate import decorate, decorate_template
+from mailman.utilities.i18n import search
+from zope.component import getUtility
def seconds_to_delta(value):
return datetime.timedelta(seconds=value)
+
+def days_to_delta(value):
+ return datetime.timedelta(days=value)
+
+
+def list_members_to_unicode(value):
+ return [ unicode(item) for item in value ]
+
+
+def filter_action_mapping(value):
+ # The filter_action enum values have changed. In Mailman 2.1 the order was
+ # 'Discard', 'Reject', 'Forward to List Owner', 'Preserve'.
+ # In 3.0 it's 'hold', 'reject', 'discard', 'accept', 'defer', 'forward',
+ # 'preserve'
+ if value == 0:
+ return FilterAction.discard
+ elif value == 1:
+ return FilterAction.reject
+ elif value == 2:
+ return FilterAction.forward
+ elif value == 3:
+ return FilterAction.preserve
+ else:
+ raise ValueError("Unknown filter_action value: %s" % value)
+
+
+def member_action_mapping(value):
+ # The mlist.default_member_action and mlist.default_nonmember_action enum
+ # values are different in Mailman 2.1, because they have been merged into a
+ # single enum in Mailman 3
+ # For default_member_action, which used to be called
+ # member_moderation_action, the values were:
+ # 0==Hold, 1=Reject, 2==Discard
+ if value == 0:
+ return Action.hold
+ elif value == 1:
+ return Action.reject
+ elif value == 2:
+ return Action.discard
+def nonmember_action_mapping(value):
+ # For default_nonmember_action, which used to be called
+ # generic_nonmember_action, the values were:
+ # 0==Accept, 1==Hold, 2==Reject, 3==Discard
+ if value == 0:
+ return Action.accept
+ elif value == 1:
+ return Action.hold
+ elif value == 2:
+ return Action.reject
+ elif value == 3:
+ return Action.discard
+
# Attributes in Mailman 2 which have a different type in Mailman 3.
TYPES = dict(
autorespond_owner=ResponseAction,
autorespond_postings=ResponseAction,
autorespond_requests=ResponseAction,
+ autoresponse_grace_period=days_to_delta,
bounce_info_stale_after=seconds_to_delta,
bounce_you_are_disabled_warnings_interval=seconds_to_delta,
digest_volume_frequency=DigestFrequency,
- filter_action=FilterAction,
+ filter_action=filter_action_mapping,
newsgroup_moderation=NewsgroupModeration,
personalize=Personalization,
reply_goes_to_list=ReplyToMunging,
+ filter_types=list_members_to_unicode,
+ pass_types=list_members_to_unicode,
+ filter_extensions=list_members_to_unicode,
+ pass_extensions=list_members_to_unicode,
+ forward_unrecognized_bounces_to=UnrecognizedBounceDisposition,
+ default_member_action=member_action_mapping,
+ default_nonmember_action=nonmember_action_mapping,
)
@@ -61,6 +132,28 @@ NAME_MAPPINGS = dict(
host_name='mail_host',
include_list_post_header='allow_list_posts',
real_name='display_name',
+ last_post_time='last_post_at',
+ autoresponse_graceperiod='autoresponse_grace_period',
+ autorespond_admin='autorespond_owner',
+ autoresponse_admin_text='autoresponse_owner_text',
+ filter_mime_types='filter_types',
+ pass_mime_types='pass_types',
+ filter_filename_extensions='filter_extensions',
+ pass_filename_extensions='pass_extensions',
+ bounce_processing='process_bounces',
+ bounce_unrecognized_goes_to_list_owner='forward_unrecognized_bounces_to',
+ mod_password='moderator_password',
+ news_moderation='newsgroup_moderation',
+ news_prefix_subject_too='nntp_prefix_subject_too',
+ send_welcome_msg='send_welcome_message',
+ send_goodbye_msg='send_goodbye_message',
+ member_moderation_action='default_member_action',
+ generic_nonmember_action='default_nonmember_action',
+ )
+
+EXCLUDES = (
+ "members",
+ "digest_members",
)
@@ -74,6 +167,9 @@ def import_config_pck(mlist, config_dict):
:type config_dict: dict
"""
for key, value in config_dict.items():
+ # Some attributes must not be directly imported
+ if key in EXCLUDES:
+ continue
# Some attributes from Mailman 2 were renamed in Mailman 3.
key = NAME_MAPPINGS.get(key, key)
# Handle the simple case where the key is an attribute of the
@@ -99,3 +195,155 @@ def import_config_pck(mlist, config_dict):
mlist.archive_policy = ArchivePolicy.public
else:
mlist.archive_policy = ArchivePolicy.never
+ # Handle ban list
+ for addr in config_dict.get('ban_list', []):
+ IBanManager(mlist).ban(addr)
+ # Handle acceptable aliases
+ for addr in config_dict.get('acceptable_aliases', '').splitlines():
+ addr = addr.strip()
+ if not addr:
+ continue
+ IAcceptableAliasSet(mlist).add(addr)
+ # Handle conversion to URIs
+ convert_to_uri = {
+ "welcome_msg": "welcome_message_uri",
+ "goodbye_msg": "goodbye_message_uri",
+ "msg_header": "header_uri",
+ "msg_footer": "footer_uri",
+ "digest_header": "digest_header_uri",
+ "digest_footer": "digest_footer_uri",
+ }
+ convert_placeholders = { # only the most common ones
+ "%(real_name)s": "$display_name",
+ "%(real_name)s@%(host_name)s": "$fqdn_listname",
+ "%(web_page_url)slistinfo%(cgiext)s/%(_internal_name)s": "$listinfo_uri",
+ }
+ for oldvar, newvar in convert_to_uri.iteritems():
+ if oldvar not in config_dict:
+ continue
+ text = config_dict[oldvar]
+ for oldph, newph in convert_placeholders.iteritems():
+ text = text.replace(oldph, newph)
+ default_value = getattr(mlist, newvar)
+ if not text and not default_value:
+ continue
+ # Check if the value changed from the default
+ try:
+ default_text = decorate(mlist, default_value)
+ expanded_text = decorate_template(mlist, text)
+ except (URLError, KeyError):
+ # Use case: importing the old a@ex.com into b@ex.com
+ # We can't check if it changed from the default
+ # -> don't import, we may do more harm than good and it's easy to
+ # change if needed
+ continue
+ if not text and not default_text:
+ continue # both are empty, leave it
+ if expanded_text.strip() == default_text.strip():
+ continue # keep the default
+ # Write the custom value to the right file
+ base_uri = "mailman:///$listname/$language/"
+ if default_value:
+ filename = default_value.rpartition("/")[2]
+ else:
+ filename = "%s.txt" % newvar[:-4]
+ if not default_value or not default_value.startswith(base_uri):
+ setattr(mlist, newvar, base_uri + filename)
+ filepath = list(search(filename, mlist))[0]
+ try:
+ os.makedirs(os.path.dirname(filepath))
+ except OSError, e:
+ if e.errno != 17: # Already exists
+ raise
+ with open(filepath, "w") as template:
+ template.write(text.encode('utf-8'))
+ # Import rosters
+ members = set(config_dict.get("members", {}).keys()
+ + config_dict.get("digest_members", {}).keys())
+ import_roster(mlist, config_dict, members, MemberRole.member)
+ import_roster(mlist, config_dict, config_dict.get("owner", []),
+ MemberRole.owner)
+ import_roster(mlist, config_dict, config_dict.get("moderator", []),
+ MemberRole.moderator)
+
+
+
+def import_roster(mlist, config_dict, members, role):
+ """
+ Import members lists from a config.pck configuration dictionary to a
+ mailing list.
+
+ :param mlist: The mailing list.
+ :type mlist: IMailingList
+ :param config_dict: The Mailman 2.1 configuration dictionary.
+ :type config_dict: dict
+ :param members: The members list to import
+ :type members: list
+ :param role: The MemberRole to import them as
+ :type role: MemberRole enum
+ """
+ usermanager = getUtility(IUserManager)
+ for email in members:
+ email = unicode(email)
+ roster = mlist.get_roster(role)
+ if roster.get_member(email) is not None:
+ print("%s is already imported with role %s" % (email, role),
+ file=sys.stderr)
+ continue
+ user = usermanager.get_user(email)
+ if user is None:
+ merged_members = {}
+ merged_members.update(config_dict.get("members", {}))
+ merged_members.update(config_dict.get("digest_members", {}))
+ if merged_members.get(email, 0) != 0:
+ original_email = merged_members[email]
+ else:
+ original_email = email
+ user = usermanager.create_user(original_email)
+ address = usermanager.get_address(email)
+ address.verified_on = datetime.datetime.now()
+ mlist.subscribe(address, role)
+ member = roster.get_member(email)
+ assert member is not None
+ prefs = config_dict.get("user_options", {}).get(email, 0)
+ if email in config_dict.get("members", {}):
+ member.preferences.delivery_mode = DeliveryMode.regular
+ elif email in config_dict.get("digest_members", {}):
+ if prefs & 8: # DisableMime
+ member.preferences.delivery_mode = DeliveryMode.plaintext_digests
+ else:
+ member.preferences.delivery_mode = DeliveryMode.mime_digests
+ else:
+ # probably not adding a member role here
+ pass
+ if email in config_dict.get("language", {}):
+ member.preferences.preferred_language = \
+ unicode(config_dict["language"][email])
+ # if the user already exists, display_name and password will be
+ # overwritten
+ if email in config_dict.get("usernames", {}):
+ address.display_name = unicode(config_dict["usernames"][email])
+ user.display_name = unicode(config_dict["usernames"][email])
+ if email in config_dict.get("passwords", {}):
+ user.password = config.password_context.encrypt(
+ config_dict["passwords"][email])
+ # delivery_status
+ oldds = config_dict.get("delivery_status", {}).get(email, (0, 0))[0]
+ if oldds == 0:
+ member.preferences.delivery_status = DeliveryStatus.enabled
+ elif oldds == 1:
+ member.preferences.delivery_status = DeliveryStatus.unknown
+ elif oldds == 2:
+ member.preferences.delivery_status = DeliveryStatus.by_user
+ elif oldds == 3:
+ member.preferences.delivery_status = DeliveryStatus.by_moderator
+ elif oldds == 4:
+ member.preferences.delivery_status = DeliveryStatus.by_bounces
+ # moderation
+ if prefs & 128:
+ member.moderation_action = Action.hold
+ # other preferences
+ member.preferences.acknowledge_posts = bool(prefs & 4) # AcknowledgePosts
+ member.preferences.hide_address = bool(prefs & 16) # ConcealSubscription
+ member.preferences.receive_own_postings = not bool(prefs & 2) # DontReceiveOwnPosts
+ member.preferences.receive_list_copy = not bool(prefs & 256) # DontReceiveDuplicates
diff --git a/src/mailman/utilities/tests/test_import.py b/src/mailman/utilities/tests/test_import.py
index 45df80d80..b8eb3e280 100644
--- a/src/mailman/utilities/tests/test_import.py
+++ b/src/mailman/utilities/tests/test_import.py
@@ -27,15 +27,35 @@ __all__ = [
import cPickle
import unittest
+from datetime import timedelta, datetime
from mailman.app.lifecycle import create_list, remove_list
from mailman.testing.layers import ConfigLayer
from mailman.utilities.importer import import_config_pck
from mailman.interfaces.archiver import ArchivePolicy
+from mailman.interfaces.action import Action, FilterAction
+from mailman.interfaces.bounce import UnrecognizedBounceDisposition
+from mailman.interfaces.bans import IBanManager
+from mailman.interfaces.mailinglist import IAcceptableAliasSet
+from mailman.interfaces.nntp import NewsgroupModeration
+from mailman.interfaces.autorespond import ResponseAction
+from mailman.interfaces.templates import ITemplateLoader
+from mailman.interfaces.usermanager import IUserManager
+from mailman.interfaces.member import DeliveryMode, DeliveryStatus
+from mailman.interfaces.languages import ILanguageManager
+from mailman.handlers.decorate import decorate
+from mailman.utilities.string import expand
from pkg_resources import resource_filename
+from enum import Enum
+from zope.component import getUtility
+class DummyEnum(Enum):
+ # For testing purposes
+ val = 42
+
+
class TestBasicImport(unittest.TestCase):
layer = ConfigLayer
@@ -70,6 +90,121 @@ class TestBasicImport(unittest.TestCase):
self.assertTrue(self._mlist.allow_list_posts)
self.assertTrue(self._mlist.include_rfc2369_headers)
+ def test_no_overwrite_rosters(self):
+ # The mlist.members and mlist.digest_members rosters must not be
+ # overwritten.
+ for rname in ("members", "digest_members"):
+ roster = getattr(self._mlist, rname)
+ self.assertFalse(isinstance(roster, dict))
+ self._import()
+ self.assertFalse(isinstance(roster, dict),
+ "The %s roster has been overwritten by the import" % rname)
+
+ def test_last_post_time(self):
+ # last_post_time -> last_post_at
+ self._pckdict["last_post_time"] = 1270420800.274485
+ self.assertEqual(self._mlist.last_post_at, None)
+ self._import()
+ # convert 1270420800.2744851 to datetime
+ expected = datetime(2010, 4, 4, 22, 40, 0, 274485)
+ self.assertEqual(self._mlist.last_post_at, expected)
+
+ def test_autoresponse_grace_period(self):
+ # autoresponse_graceperiod -> autoresponse_grace_period
+ # must be a timedelta, not an int
+ self._mlist.autoresponse_grace_period = timedelta(days=42)
+ self._import()
+ self.assertTrue(isinstance(
+ self._mlist.autoresponse_grace_period, timedelta))
+ self.assertEqual(self._mlist.autoresponse_grace_period,
+ timedelta(days=90))
+
+ def test_autoresponse_admin_to_owner(self):
+ # admin -> owner
+ self._mlist.autorespond_owner = DummyEnum.val
+ self._mlist.autoresponse_owner_text = 'DUMMY'
+ self._import()
+ self.assertEqual(self._mlist.autorespond_owner, ResponseAction.none)
+ self.assertEqual(self._mlist.autoresponse_owner_text, '')
+
+ #def test_administrative(self):
+ # # administrivia -> administrative
+ # self._mlist.administrative = None
+ # self._import()
+ # self.assertTrue(self._mlist.administrative)
+
+ def test_filter_pass_renames(self):
+ # mime_types -> types
+ # filename_extensions -> extensions
+ self._mlist.filter_types = ["dummy"]
+ self._mlist.pass_types = ["dummy"]
+ self._mlist.filter_extensions = ["dummy"]
+ self._mlist.pass_extensions = ["dummy"]
+ self._import()
+ self.assertEqual(list(self._mlist.filter_types), [])
+ self.assertEqual(list(self._mlist.filter_extensions),
+ ['exe', 'bat', 'cmd', 'com', 'pif',
+ 'scr', 'vbs', 'cpl'])
+ self.assertEqual(list(self._mlist.pass_types),
+ ['multipart/mixed', 'multipart/alternative', 'text/plain'])
+ self.assertEqual(list(self._mlist.pass_extensions), [])
+
+ def test_process_bounces(self):
+ # bounce_processing -> process_bounces
+ self._mlist.process_bounces = None
+ self._import()
+ self.assertTrue(self._mlist.process_bounces)
+
+ def test_forward_unrecognized_bounces_to(self):
+ # bounce_unrecognized_goes_to_list_owner -> forward_unrecognized_bounces_to
+ self._mlist.forward_unrecognized_bounces_to = DummyEnum.val
+ self._import()
+ self.assertEqual(self._mlist.forward_unrecognized_bounces_to,
+ UnrecognizedBounceDisposition.administrators)
+
+ def test_moderator_password(self):
+ # mod_password -> moderator_password
+ self._mlist.moderator_password = str("TESTDATA")
+ self._import()
+ self.assertEqual(self._mlist.moderator_password, None)
+
+ def test_newsgroup_moderation(self):
+ # news_moderation -> newsgroup_moderation
+ # news_prefix_subject_too -> nntp_prefix_subject_too
+ self._mlist.newsgroup_moderation = DummyEnum.val
+ self._mlist.nntp_prefix_subject_too = None
+ self._import()
+ self.assertEqual(self._mlist.newsgroup_moderation,
+ NewsgroupModeration.none)
+ self.assertTrue(self._mlist.nntp_prefix_subject_too)
+
+ def test_msg_to_message(self):
+ # send_welcome_msg -> send_welcome_message
+ # send_goodbye_msg -> send_goodbye_message
+ self._mlist.send_welcome_message = None
+ self._mlist.send_goodbye_message = None
+ self._import()
+ self.assertTrue(self._mlist.send_welcome_message)
+ self.assertTrue(self._mlist.send_goodbye_message)
+
+ def test_ban_list(self):
+ banned = [
+ ("anne@example.com", "anne@example.com"),
+ ("^.*@example.com", "bob@example.com")
+ ]
+ self._pckdict["ban_list"] = [ b[0] for b in banned ]
+ self._import()
+ for _pattern, addr in banned:
+ self.assertTrue(IBanManager(self._mlist).is_banned(addr))
+
+ def test_acceptable_aliases(self):
+ # it used to be a plain-text field (values are newline-separated)
+ aliases = ["alias1@example.com", "alias2@exemple.com"]
+ self._pckdict["acceptable_aliases"] = "\n".join(aliases)
+ self._import()
+ alias_set = IAcceptableAliasSet(self._mlist)
+ self.assertEqual(sorted(alias_set.aliases), aliases)
+
class TestArchiveImport(unittest.TestCase):
@@ -80,7 +215,7 @@ class TestArchiveImport(unittest.TestCase):
def setUp(self):
self._mlist = create_list('blank@example.com')
- self._mlist.archive_policy = "INITIAL-TEST-VALUE"
+ self._mlist.archive_policy = DummyEnum.val
def tearDown(self):
remove_list(self._mlist)
@@ -100,3 +235,434 @@ class TestArchiveImport(unittest.TestCase):
def test_no_archive(self):
self._do_test({ "archive": False, "archive_private": False },
ArchivePolicy.never)
+
+
+
+class TestFilterActionImport(unittest.TestCase):
+ # The mlist.filter_action enum values have changed. In Mailman 2.1 the
+ # order was 'Discard', 'Reject', 'Forward to List Owner', 'Preserve'.
+
+ layer = ConfigLayer
+
+ def setUp(self):
+ self._mlist = create_list('blank@example.com')
+ self._mlist.filter_action = DummyEnum.val
+
+ def tearDown(self):
+ remove_list(self._mlist)
+
+ def _do_test(self, original, expected):
+ import_config_pck(self._mlist, { "filter_action": original })
+ self.assertEqual(self._mlist.filter_action, expected)
+
+ def test_discard(self):
+ self._do_test(0, FilterAction.discard)
+
+ def test_reject(self):
+ self._do_test(1, FilterAction.reject)
+
+ def test_forward(self):
+ self._do_test(2, FilterAction.forward)
+
+ def test_preserve(self):
+ self._do_test(3, FilterAction.preserve)
+
+
+
+class TestMemberActionImport(unittest.TestCase):
+ # The mlist.default_member_action and mlist.default_nonmember_action enum
+ # values are different in Mailman 2.1, they have been merged into a
+ # single enum in Mailman 3
+ # For default_member_action, which used to be called
+ # member_moderation_action, the values were:
+ # 0==Hold, 1=Reject, 2==Discard
+ # For default_nonmember_action, which used to be called
+ # generic_nonmember_action, the values were:
+ # 0==Accept, 1==Hold, 2==Reject, 3==Discard
+
+ layer = ConfigLayer
+
+ def setUp(self):
+ self._mlist = create_list('blank@example.com')
+ self._mlist.default_member_action = DummyEnum.val
+ self._mlist.default_nonmember_action = DummyEnum.val
+ self._pckdict = {
+ "member_moderation_action": DummyEnum.val,
+ "generic_nonmember_action": DummyEnum.val,
+ }
+
+ def tearDown(self):
+ remove_list(self._mlist)
+
+ def _do_test(self, expected):
+ import_config_pck(self._mlist, self._pckdict)
+ for key, value in expected.iteritems():
+ self.assertEqual(getattr(self._mlist, key), value)
+
+ def test_member_hold(self):
+ self._pckdict["member_moderation_action"] = 0
+ self._do_test({"default_member_action": Action.hold})
+
+ def test_member_reject(self):
+ self._pckdict["member_moderation_action"] = 1
+ self._do_test({"default_member_action": Action.reject})
+
+ def test_member_discard(self):
+ self._pckdict["member_moderation_action"] = 2
+ self._do_test({"default_member_action": Action.discard})
+
+ def test_nonmember_accept(self):
+ self._pckdict["generic_nonmember_action"] = 0
+ self._do_test({"default_nonmember_action": Action.accept})
+
+ def test_nonmember_hold(self):
+ self._pckdict["generic_nonmember_action"] = 1
+ self._do_test({"default_nonmember_action": Action.hold})
+
+ def test_nonmember_reject(self):
+ self._pckdict["generic_nonmember_action"] = 2
+ self._do_test({"default_nonmember_action": Action.reject})
+
+ def test_nonmember_discard(self):
+ self._pckdict["generic_nonmember_action"] = 3
+ self._do_test({"default_nonmember_action": Action.discard})
+
+
+
+class TestConvertToURI(unittest.TestCase):
+ # The following values were plain text, and are now URIs in Mailman 3:
+ # - welcome_message_uri
+ # - goodbye_message_uri
+ # - header_uri
+ # - footer_uri
+ # - digest_header_uri
+ # - digest_footer_uri
+ #
+ # The templates contain variables that must be replaced:
+ # - %(real_name)s -> %(display_name)s
+ # - %(real_name)s@%(host_name)s -> %(fqdn_listname)s
+ # - %(web_page_url)slistinfo%(cgiext)s/%(_internal_name)s -> %(listinfo_uri)s
+
+ layer = ConfigLayer
+
+ def setUp(self):
+ self._mlist = create_list('blank@example.com')
+ self._conf_mapping = {
+ "welcome_msg": "welcome_message_uri",
+ "goodbye_msg": "goodbye_message_uri",
+ "msg_header": "header_uri",
+ "msg_footer": "footer_uri",
+ "digest_header": "digest_header_uri",
+ "digest_footer": "digest_footer_uri",
+ }
+ self._pckdict = {}
+ #self._pckdict = {
+ # "preferred_language": "XX", # templates are lang-specific
+ #}
+
+ def tearDown(self):
+ remove_list(self._mlist)
+
+ def test_text_to_uri(self):
+ for oldvar, newvar in self._conf_mapping.iteritems():
+ self._pckdict[oldvar] = "TEST VALUE"
+ import_config_pck(self._mlist, self._pckdict)
+ newattr = getattr(self._mlist, newvar)
+ text = decorate(self._mlist, newattr)
+ self.assertEqual(text, "TEST VALUE",
+ "Old variable %s was not properly imported to %s"
+ % (oldvar, newvar))
+
+ def test_substitutions(self):
+ test_text = ("UNIT TESTING %(real_name)s mailing list\n"
+ "%(real_name)s@%(host_name)s\n"
+ "%(web_page_url)slistinfo%(cgiext)s/%(_internal_name)s")
+ expected_text = ("UNIT TESTING $display_name mailing list\n"
+ "$fqdn_listname\n"
+ "$listinfo_uri")
+ for oldvar, newvar in self._conf_mapping.iteritems():
+ self._pckdict[oldvar] = test_text
+ import_config_pck(self._mlist, self._pckdict)
+ newattr = getattr(self._mlist, newvar)
+ template_uri = expand(newattr, dict(
+ listname=self._mlist.fqdn_listname,
+ language=self._mlist.preferred_language.code,
+ ))
+ loader = getUtility(ITemplateLoader)
+ text = loader.get(template_uri)
+ self.assertEqual(text, expected_text,
+ "Old variables were not converted for %s" % newvar)
+
+ def test_keep_default(self):
+ # If the value was not changed from MM2.1's default, don't import it
+ default_msg_footer = (
+ "_______________________________________________\n"
+ "%(real_name)s mailing list\n"
+ "%(real_name)s@%(host_name)s\n"
+ "%(web_page_url)slistinfo%(cgiext)s/%(_internal_name)s"
+ )
+ for oldvar in ("msg_footer", "digest_footer"):
+ newvar = self._conf_mapping[oldvar]
+ self._pckdict[oldvar] = default_msg_footer
+ old_value = getattr(self._mlist, newvar)
+ import_config_pck(self._mlist, self._pckdict)
+ new_value = getattr(self._mlist, newvar)
+ self.assertEqual(old_value, new_value,
+ "Default value was not preserved for %s" % newvar)
+
+ def test_keep_default_if_fqdn_changed(self):
+ # Use case: importing the old a@ex.com into b@ex.com
+ # We can't check if it changed from the default
+ # -> don't import, we may do more harm than good and it's easy to
+ # change if needed
+ test_value = "TEST-VALUE"
+ for oldvar, newvar in self._conf_mapping.iteritems():
+ self._mlist.mail_host = "example.com"
+ self._pckdict["mail_host"] = "test.example.com"
+ self._pckdict[oldvar] = test_value
+ old_value = getattr(self._mlist, newvar)
+ import_config_pck(self._mlist, self._pckdict)
+ new_value = getattr(self._mlist, newvar)
+ self.assertEqual(old_value, new_value,
+ "Default value was not preserved for %s" % newvar)
+
+
+
+class TestRosterImport(unittest.TestCase):
+
+ layer = ConfigLayer
+
+ def setUp(self):
+ self._mlist = create_list('blank@example.com')
+ self._pckdict = {
+ "members": {
+ "anne@example.com": 0,
+ "bob@example.com": "bob@ExampLe.Com",
+ },
+ "digest_members": {
+ "cindy@example.com": 0,
+ "dave@example.com": "dave@ExampLe.Com",
+ },
+ "passwords": {
+ "anne@example.com" : "annepass",
+ "bob@example.com" : "bobpass",
+ "cindy@example.com": "cindypass",
+ "dave@example.com" : "davepass",
+ },
+ "language": {
+ "anne@example.com" : "fr",
+ "bob@example.com" : "de",
+ "cindy@example.com": "es",
+ "dave@example.com" : "it",
+ },
+ "usernames": {
+ "anne@example.com" : "Anne",
+ "bob@example.com" : "Bob",
+ "cindy@example.com": "Cindy",
+ "dave@example.com" : "Dave",
+ },
+ "owner": [
+ "anne@example.com",
+ "emily@example.com",
+ ],
+ "moderator": [
+ "bob@example.com",
+ "fred@example.com",
+ ],
+ }
+ self._usermanager = getUtility(IUserManager)
+ language_manager = getUtility(ILanguageManager)
+ for code in self._pckdict["language"].values():
+ if code not in language_manager.codes:
+ language_manager.add(code, 'utf-8', code)
+
+ def tearDown(self):
+ remove_list(self._mlist)
+
+ def test_member(self):
+ import_config_pck(self._mlist, self._pckdict)
+ for name in ("anne", "bob", "cindy", "dave"):
+ addr = "%s@example.com" % name
+ self.assertTrue(
+ addr in [ a.email for a in self._mlist.members.addresses],
+ "Address %s was not imported" % addr)
+ self.assertTrue("anne@example.com" in [ a.email
+ for a in self._mlist.regular_members.addresses])
+ self.assertTrue("bob@example.com" in [ a.email
+ for a in self._mlist.regular_members.addresses])
+ self.assertTrue("cindy@example.com" in [ a.email
+ for a in self._mlist.digest_members.addresses])
+ self.assertTrue("dave@example.com" in [ a.email
+ for a in self._mlist.digest_members.addresses])
+
+ def test_original_email(self):
+ import_config_pck(self._mlist, self._pckdict)
+ bob = self._usermanager.get_address("bob@example.com")
+ self.assertEqual(bob.original_email, "bob@ExampLe.Com")
+ dave = self._usermanager.get_address("dave@example.com")
+ self.assertEqual(dave.original_email, "dave@ExampLe.Com")
+
+ def test_language(self):
+ import_config_pck(self._mlist, self._pckdict)
+ for name in ("anne", "bob", "cindy", "dave"):
+ addr = "%s@example.com" % name
+ member = self._mlist.members.get_member(addr)
+ self.assertTrue(member is not None,
+ "Address %s was not imported" % addr)
+ print(self._pckdict["language"])
+ print(member.preferred_language, member.preferred_language.code)
+ self.assertEqual(member.preferred_language.code,
+ self._pckdict["language"][addr])
+
+ def test_username(self):
+ import_config_pck(self._mlist, self._pckdict)
+ for name in ("anne", "bob", "cindy", "dave"):
+ addr = "%s@example.com" % name
+ user = self._usermanager.get_user(addr)
+ address = self._usermanager.get_address(addr)
+ self.assertTrue(user is not None,
+ "User %s was not imported" % addr)
+ self.assertTrue(address is not None,
+ "Address %s was not imported" % addr)
+ display_name = self._pckdict["usernames"][addr]
+ self.assertEqual(user.display_name, display_name,
+ "The display name was not set for User %s" % addr)
+ self.assertEqual(address.display_name, display_name,
+ "The display name was not set for Address %s" % addr)
+
+ def test_owner(self):
+ import_config_pck(self._mlist, self._pckdict)
+ for name in ("anne", "emily"):
+ addr = "%s@example.com" % name
+ self.assertTrue(
+ addr in [ a.email for a in self._mlist.owners.addresses ],
+ "Address %s was not imported as owner" % addr)
+ self.assertFalse("emily@example.com" in
+ [ a.email for a in self._mlist.members.addresses ],
+ "Address emily@ was wrongly added to the members list")
+
+ def test_moderator(self):
+ import_config_pck(self._mlist, self._pckdict)
+ for name in ("bob", "fred"):
+ addr = "%s@example.com" % name
+ self.assertTrue(
+ addr in [ a.email for a in self._mlist.moderators.addresses ],
+ "Address %s was not imported as moderator" % addr)
+ self.assertFalse("fred@example.com" in
+ [ a.email for a in self._mlist.members.addresses ],
+ "Address fred@ was wrongly added to the members list")
+
+ def test_password(self):
+ #self.anne.password = config.password_context.encrypt('abc123')
+ import_config_pck(self._mlist, self._pckdict)
+ for name in ("anne", "bob", "cindy", "dave"):
+ addr = "%s@example.com" % name
+ user = self._usermanager.get_user(addr)
+ self.assertTrue(user is not None,
+ "Address %s was not imported" % addr)
+ self.assertEqual(user.password, '{plaintext}%spass' % name,
+ "Password for %s was not imported" % addr)
+
+ def test_same_user(self):
+ # Adding the address of an existing User must not create another user
+ user = self._usermanager.create_user('anne@example.com', 'Anne')
+ user.register("bob@example.com") # secondary email
+ import_config_pck(self._mlist, self._pckdict)
+ member = self._mlist.members.get_member('bob@example.com')
+ self.assertEqual(member.user, user)
+
+
+
+class TestPreferencesImport(unittest.TestCase):
+
+ layer = ConfigLayer
+
+ def setUp(self):
+ self._mlist = create_list('blank@example.com')
+ self._pckdict = {
+ "members": { "anne@example.com": 0 },
+ "user_options": {},
+ "delivery_status": {},
+ }
+ self._usermanager = getUtility(IUserManager)
+
+ def tearDown(self):
+ remove_list(self._mlist)
+
+ def _do_test(self, oldvalue, expected):
+ self._pckdict["user_options"]["anne@example.com"] = oldvalue
+ import_config_pck(self._mlist, self._pckdict)
+ user = self._usermanager.get_user("anne@example.com")
+ self.assertTrue(user is not None, "User was not imported")
+ member = self._mlist.members.get_member("anne@example.com")
+ self.assertTrue(member is not None, "Address was not subscribed")
+ for exp_name, exp_val in expected.iteritems():
+ try:
+ currentval = getattr(member, exp_name)
+ except AttributeError:
+ # hide_address has no direct getter
+ currentval = getattr(member.preferences, exp_name)
+ self.assertEqual(currentval, exp_val,
+ "Preference %s was not imported" % exp_name)
+ # XXX: should I check that other params are still equal to
+ # mailman.core.constants.system_preferences ?
+
+ def test_acknowledge_posts(self):
+ # AcknowledgePosts
+ self._do_test(4, {"acknowledge_posts": True})
+
+ def test_hide_address(self):
+ # ConcealSubscription
+ self._do_test(16, {"hide_address": True})
+
+ def test_receive_own_postings(self):
+ # DontReceiveOwnPosts
+ self._do_test(2, {"receive_own_postings": False})
+
+ def test_receive_list_copy(self):
+ # DontReceiveDuplicates
+ self._do_test(256, {"receive_list_copy": False})
+
+ def test_digest_plain(self):
+ # Digests & DisableMime
+ self._pckdict["digest_members"] = self._pckdict["members"].copy()
+ self._pckdict["members"] = {}
+ self._do_test(8, {"delivery_mode": DeliveryMode.plaintext_digests})
+
+ def test_digest_mime(self):
+ # Digests & not DisableMime
+ self._pckdict["digest_members"] = self._pckdict["members"].copy()
+ self._pckdict["members"] = {}
+ self._do_test(0, {"delivery_mode": DeliveryMode.mime_digests})
+
+ def test_delivery_status(self):
+ # look for the pckdict["delivery_status"] key which will look like
+ # (status, time) where status is among the following:
+ # ENABLED = 0 # enabled
+ # UNKNOWN = 1 # legacy disabled
+ # BYUSER = 2 # disabled by user choice
+ # BYADMIN = 3 # disabled by admin choice
+ # BYBOUNCE = 4 # disabled by bounces
+ for oldval, expected in enumerate((DeliveryStatus.enabled,
+ DeliveryStatus.unknown, DeliveryStatus.by_user,
+ DeliveryStatus.by_moderator, DeliveryStatus.by_bounces)):
+ self._pckdict["delivery_status"]["anne@example.com"] = (oldval, 0)
+ import_config_pck(self._mlist, self._pckdict)
+ member = self._mlist.members.get_member("anne@example.com")
+ self.assertTrue(member is not None, "Address was not subscribed")
+ self.assertEqual(member.delivery_status, expected)
+ member.unsubscribe()
+
+ def test_moderate(self):
+ # Option flag Moderate is translated to
+ # member.moderation_action = Action.hold
+ self._do_test(128, {"moderation_action": Action.hold})
+
+ def test_multiple_options(self):
+ # DontReceiveDuplicates & DisableMime & SuppressPasswordReminder
+ self._pckdict["digest_members"] = self._pckdict["members"].copy()
+ self._pckdict["members"] = {}
+ self._do_test(296, {
+ "receive_list_copy": False,
+ "delivery_mode": DeliveryMode.plaintext_digests,
+ })