summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAurélien Bompard2013-10-11 17:49:09 +0200
committerAurélien Bompard2013-10-11 17:49:09 +0200
commitc56a83d5eb2ca8aa7e384e27825f2f34faeabebe (patch)
tree6d9d482bd3d07d7008c7ae6f862795096bb66544 /src
parent58b998361fa7985b8fbeb369ae9aff553a927132 (diff)
downloadmailman-c56a83d5eb2ca8aa7e384e27825f2f34faeabebe.tar.gz
mailman-c56a83d5eb2ca8aa7e384e27825f2f34faeabebe.tar.zst
mailman-c56a83d5eb2ca8aa7e384e27825f2f34faeabebe.zip
Diffstat (limited to 'src')
-rw-r--r--src/mailman/utilities/importer.py27
-rw-r--r--src/mailman/utilities/tests/test_import.py50
2 files changed, 65 insertions, 12 deletions
diff --git a/src/mailman/utilities/importer.py b/src/mailman/utilities/importer.py
index 68e0274fa..e97db12dc 100644
--- a/src/mailman/utilities/importer.py
+++ b/src/mailman/utilities/importer.py
@@ -350,24 +350,29 @@ def import_roster(mlist, config_dict, members, role):
"""
usermanager = getUtility(IUserManager)
for email in members:
- email = str_to_unicode(email)
+ # for owners and members, the emails can have a mixed case, so
+ # lowercase them all
+ email = str_to_unicode(email).lower()
roster = mlist.get_roster(role)
if roster.get_member(email) is not None:
print("%s is already imported with role %s" % (email, role),
file=sys.stderr)
continue
+ address = usermanager.get_address(email)
user = usermanager.get_user(email)
if user is None:
- merged_members = {}
- merged_members.update(config_dict.get("members", {}))
- merged_members.update(config_dict.get("digest_members", {}))
- if merged_members.get(email, 0) != 0:
- original_email = merged_members[email]
- else:
- original_email = email
- user = usermanager.create_user(str_to_unicode(original_email))
- address = usermanager.get_address(email)
- address.verified_on = datetime.datetime.now()
+ user = usermanager.create_user()
+ if address is None:
+ merged_members = {}
+ merged_members.update(config_dict.get("members", {}))
+ merged_members.update(config_dict.get("digest_members", {}))
+ if merged_members.get(email, 0) != 0:
+ original_email = str_to_unicode(merged_members[email])
+ else:
+ original_email = email
+ address = usermanager.create_address(original_email)
+ address.verified_on = datetime.datetime.now()
+ user.link(address)
mlist.subscribe(address, role)
member = roster.get_member(email)
assert member is not None
diff --git a/src/mailman/utilities/tests/test_import.py b/src/mailman/utilities/tests/test_import.py
index 3a909e857..a29c560dd 100644
--- a/src/mailman/utilities/tests/test_import.py
+++ b/src/mailman/utilities/tests/test_import.py
@@ -28,12 +28,14 @@ __all__ = [
import cPickle
import unittest
from datetime import timedelta, datetime
+from traceback import format_exc
from mailman.app.lifecycle import create_list, remove_list
from mailman.testing.layers import ConfigLayer
from mailman.utilities.importer import import_config_pck, Import21Error
from mailman.interfaces.archiver import ArchivePolicy
from mailman.interfaces.action import Action, FilterAction
+from mailman.interfaces.address import ExistingAddressError
from mailman.interfaces.bounce import UnrecognizedBounceDisposition
from mailman.interfaces.bans import IBanManager
from mailman.interfaces.mailinglist import IAcceptableAliasSet
@@ -41,13 +43,15 @@ from mailman.interfaces.nntp import NewsgroupModeration
from mailman.interfaces.autorespond import ResponseAction
from mailman.interfaces.templates import ITemplateLoader
from mailman.interfaces.usermanager import IUserManager
-from mailman.interfaces.member import DeliveryMode, DeliveryStatus
+from mailman.interfaces.member import DeliveryMode, DeliveryStatus, MemberRole
from mailman.interfaces.languages import ILanguageManager
+from mailman.model.address import Address
from mailman.handlers.decorate import decorate
from mailman.utilities.string import expand
from pkg_resources import resource_filename
from enum import Enum
from zope.component import getUtility
+from storm.locals import Store
@@ -205,6 +209,7 @@ class TestBasicImport(unittest.TestCase):
try:
self._import()
except UnicodeDecodeError, e:
+ print(format_exc())
self.fail(e)
for _pattern, addr in banned:
self.assertTrue(IBanManager(self._mlist).is_banned(addr))
@@ -502,6 +507,7 @@ class TestConvertToURI(unittest.TestCase):
try:
import_config_pck(self._mlist, self._pckdict)
except UnicodeDecodeError, e:
+ print(format_exc())
self.fail(e)
for oldvar, newvar in self._conf_mapping.iteritems():
newattr = getattr(self._mlist, newvar)
@@ -665,6 +671,48 @@ class TestRosterImport(unittest.TestCase):
member = self._mlist.members.get_member('bob@example.com')
self.assertEqual(member.user, user)
+ def test_owner_and_moderator_not_lowercase(self):
+ # In the v2.1 pickled dict, the owner and moderator lists are not
+ # necessarily lowercased already
+ self._pckdict[b"owner"] = [b"Anne@example.com"]
+ self._pckdict[b"moderator"] = [b"Anne@example.com"]
+ try:
+ import_config_pck(self._mlist, self._pckdict)
+ except AssertionError:
+ print(format_exc())
+ self.fail("The address was not lowercased")
+ self.assertTrue("anne@example.com" in
+ [ a.email for a in self._mlist.owners.addresses ])
+ self.assertTrue("anne@example.com" in
+ [ a.email for a in self._mlist.moderators.addresses])
+
+ def test_address_already_exists_but_no_user(self):
+ # An address already exists, but it is not linked to a user nor
+ # subscribed
+ anne_addr = Address("anne@example.com", "Anne")
+ Store.of(self._mlist).add(anne_addr)
+ try:
+ import_config_pck(self._mlist, self._pckdict)
+ except ExistingAddressError:
+ print(format_exc())
+ self.fail("existing address was not checked")
+ anne = self._usermanager.get_user("anne@example.com")
+ self.assertTrue(anne.controls("anne@example.com"))
+ self.assertTrue(anne_addr in self._mlist.regular_members.addresses)
+
+ def test_address_already_subscribed_but_no_user(self):
+ # An address is already subscribed, but it is not linked to a user
+ anne_addr = Address("anne@example.com", "Anne")
+ self._mlist.subscribe(anne_addr)
+ try:
+ import_config_pck(self._mlist, self._pckdict)
+ except ExistingAddressError:
+ print(format_exc())
+ self.fail("existing address was not checked")
+ anne = self._usermanager.get_user("anne@example.com")
+ self.assertTrue(anne.controls("anne@example.com"))
+
+
class TestPreferencesImport(unittest.TestCase):