summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mailman/app/subscriptions.py59
-rw-r--r--src/mailman/app/tests/test_subscriptions.py204
2 files changed, 227 insertions, 36 deletions
diff --git a/src/mailman/app/subscriptions.py b/src/mailman/app/subscriptions.py
index 1593b4d58..0ff7dac6b 100644
--- a/src/mailman/app/subscriptions.py
+++ b/src/mailman/app/subscriptions.py
@@ -47,11 +47,12 @@ from mailman.interfaces.subscriptions import ISubscriptionService, TokenOwner
from mailman.interfaces.user import IUser
from mailman.interfaces.usermanager import IUserManager
from mailman.interfaces.workflow import IWorkflowStateManager
+from mailman.model.address import Address
from mailman.model.member import Member
+from mailman.model.user import User
from mailman.utilities.datetime import now
from mailman.utilities.i18n import make
from operator import attrgetter
-from sqlalchemy import and_, or_
from zope.component import getUtility
from zope.event import notify
from zope.interface import implementer
@@ -60,16 +61,6 @@ from zope.interface import implementer
log = logging.getLogger('mailman.subscribe')
-
-def _membership_sort_key(member):
- """Sort function for find_members().
-
- The members are sorted first by unique list id, then by subscribed email
- address, then by role.
- """
- return (member.list_id, member.address.email, member.role.value)
-
-
class WhichSubscriber(Enum):
address = 1
user = 2
@@ -385,39 +376,39 @@ class SubscriptionService:
# If `subscriber` is a user id, then we'll search for all addresses
# which are controlled by the user, otherwise we'll just search for
# the given address.
- user_manager = getUtility(IUserManager)
if subscriber is None and list_id is None and role is None:
return []
+ order = (Member.list_id, Address.email, Member.role)
# Querying for the subscriber is the most complicated part, because
- # the parameter can either be an email address or a user id.
- query = []
+ # the parameter can either be an email address or a user id. Start by
+ # building two queries, one joined on the member's address, and one
+ # joined on the member's user. Add the resulting email address to the
+ # selected values to be able to sort on it later on.
+ q_address = store.query(Member, Address.email).join(Member._address)
+ q_user = store.query(Member, Address.email).join(Member._user)
if subscriber is not None:
if isinstance(subscriber, str):
# subscriber is an email address.
- address = user_manager.get_address(subscriber)
- user = user_manager.get_user(subscriber)
- # This probably could be made more efficient.
- if address is None or user is None:
- return []
- query.append(or_(Member.address_id == address.id,
- Member.user_id == user.id))
+ q_address = q_address.filter(
+ Address.email == subscriber.lower())
+ q_user = q_user.join(User.addresses).filter(
+ Address.email == subscriber.lower())
else:
# subscriber is a user id.
- user = user_manager.get_user_by_id(subscriber)
- address_ids = list(address.id for address in user.addresses
- if address.id is not None)
- if len(address_ids) == 0 or user is None:
- return []
- query.append(or_(Member.user_id == user.id,
- Member.address_id.in_(address_ids)))
- # Calculate the rest of the query expression, which will get And'd
- # with the Or clause above (if there is one).
+ q_address = q_address.join(Address.user).filter(
+ User._user_id == subscriber)
+ q_user = q_user.join(User._preferred_address).filter(
+ User._user_id == subscriber)
+ # Add additional filters to both queries.
if list_id is not None:
- query.append(Member.list_id == list_id)
+ q_address = q_address.filter(Member.list_id == list_id)
+ q_user = q_user.filter(Member.list_id == list_id)
if role is not None:
- query.append(Member.role == role)
- results = store.query(Member).filter(and_(*query))
- return sorted(results, key=_membership_sort_key)
+ q_address = q_address.filter(Member.role == role)
+ q_user = q_user.filter(Member.role == role)
+ # Do a UNION of the two queries, sort the result and generate Members.
+ query = q_address.union(q_user).order_by(*order).from_self(Member)
+ return query.all()
def __iter__(self):
for member in self.get_members():
diff --git a/src/mailman/app/tests/test_subscriptions.py b/src/mailman/app/tests/test_subscriptions.py
index fa51184a9..62b2a8d8c 100644
--- a/src/mailman/app/tests/test_subscriptions.py
+++ b/src/mailman/app/tests/test_subscriptions.py
@@ -27,9 +27,9 @@ import unittest
from mailman.app.lifecycle import create_list
from mailman.app.subscriptions import SubscriptionWorkflow
from mailman.interfaces.bans import IBanManager
-from mailman.interfaces.member import MembershipIsBannedError
+from mailman.interfaces.member import MemberRole, MembershipIsBannedError
from mailman.interfaces.pending import IPendings
-from mailman.interfaces.subscriptions import TokenOwner
+from mailman.interfaces.subscriptions import ISubscriptionService, TokenOwner
from mailman.testing.helpers import LogFileMark, get_queue_messages
from mailman.testing.layers import ConfigLayer
from mailman.interfaces.mailinglist import SubscriptionPolicy
@@ -641,3 +641,203 @@ approval:
self.assertIsNone(workflow.token)
self.assertEqual(workflow.token_owner, TokenOwner.no_one)
self.assertEqual(workflow.member.address, anne)
+
+
+
+class TestSubscriptionService(unittest.TestCase):
+ layer = ConfigLayer
+
+ def setUp(self):
+ self._mlist = create_list('test@example.com')
+ self._mlist.admin_immed_notify = False
+ self._user_manager = getUtility(IUserManager)
+ self._service = getUtility(ISubscriptionService)
+
+ def test_find_member_address_no_user(self):
+ # Find address-based memberships when no user is linked to the address.
+ address = self._user_manager.create_address(
+ 'anne@example.com', 'Anne Address')
+ self._mlist.subscribe(address)
+ members = self._service.find_members('anne@example.com')
+ self.assertEqual(len(members), 1)
+ self.assertEqual(members[0].address, address)
+
+ def test_find_member_address_with_user(self):
+ # Find address-based memberships when a user is linked to the address.
+ user = self._user_manager.create_user(
+ 'anne@example.com', 'Anne User')
+ address = user.addresses[0]
+ address.verified_on = now()
+ user.preferred_address = address
+ # Subscribe the address.
+ self._mlist.subscribe(address)
+ members = self._service.find_members('anne@example.com')
+ self.assertEqual(len(members), 1)
+ self.assertEqual(members[0].user, user)
+
+ def test_find_member_user(self):
+ # Find user-based memberships by address.
+ user = self._user_manager.create_user(
+ 'anne@example.com', 'Anne User')
+ address = user.addresses[0]
+ address.verified_on = now()
+ user.preferred_address = address
+ # Subscribe the user.
+ self._mlist.subscribe(user)
+ members = self._service.find_members('anne@example.com')
+ self.assertEqual(len(members), 1)
+ self.assertEqual(members[0].user, user)
+
+ def test_find_member_user_secondary_address(self):
+ # Find user-based memberships using a secondary address.
+ user = self._user_manager.create_user(
+ 'anne@example.com', 'Anne User')
+ address = user.addresses[0]
+ address.verified_on = now()
+ user.preferred_address = address
+ # Create a secondary address.
+ address_2 = self._user_manager.create_address(
+ 'anne2@example.com', 'Anne User 2')
+ address_2.user = user
+ # Subscribe the user.
+ self._mlist.subscribe(user)
+ # Search for the secondary address.
+ members = self._service.find_members('anne2@example.com')
+ self.assertEqual(len(members), 1)
+ self.assertEqual(members[0].user, user)
+
+ def test_wont_find_member_secondary_address(self):
+ # A user is subscribed with one of their address, and a search is
+ # performed on another of their addresses. This is not supported; the
+ # subscription is not returned.
+ user = self._user_manager.create_user(
+ 'anne@example.com', 'Anne User')
+ address = user.addresses[0]
+ address.verified_on = now()
+ user.preferred_address = address
+ # Create a secondary address.
+ address_2 = self._user_manager.create_address(
+ 'anne2@example.com', 'Anne User 2')
+ address_2.verified_on = now()
+ address_2.user = user
+ # Subscribe the secondary address.
+ self._mlist.subscribe(address_2)
+ # Search for the primary address.
+ members = self._service.find_members('anne@example.com')
+ self.assertEqual(len(members), 0)
+
+ def test_find_member_user_id(self):
+ # Find user-based memberships by user_id.
+ user = self._user_manager.create_user(
+ 'anne@example.com', 'Anne User')
+ address = user.addresses[0]
+ address.verified_on = now()
+ user.preferred_address = address
+ # Subscribe the user.
+ self._mlist.subscribe(user)
+ members = self._service.find_members(user.user_id)
+ self.assertEqual(len(members), 1)
+ self.assertEqual(members[0].user, user)
+
+ def test_find_member_user_id_controlled_addresses(self):
+ # Find address-based memberships by user_id when a secondary address is
+ # subscribed.
+ user = self._user_manager.create_user(
+ 'anne@example.com', 'Anne User')
+ address = user.addresses[0]
+ address.verified_on = now()
+ user.preferred_address = address
+ # Create a secondary address.
+ address_2 = self._user_manager.create_address(
+ 'anne2@example.com', 'Anne User 2')
+ address_2.verified_on = now()
+ address_2.user = user
+ # Create a third address.
+ address_3 = self._user_manager.create_address(
+ 'anne3@example.com', 'Anne User 3')
+ address_3.verified_on = now()
+ address_3.user = user
+ # Subscribe the secondary address only.
+ self._mlist.subscribe(address_2)
+ members = self._service.find_members(user.user_id)
+ self.assertEqual(len(members), 1)
+ self.assertEqual(members[0].address, address_2)
+
+ def test_find_member_sorting(self):
+ # Check that the memberships are properly sorted.
+ user = self._user_manager.create_user(
+ 'anne1@example.com', 'Anne User')
+ address = user.addresses[0]
+ address.verified_on = now()
+ user.preferred_address = address
+ # Create a secondary address.
+ address_2 = self._user_manager.create_address(
+ 'anne2@example.com', 'Anne User 2')
+ address_2.verified_on = now()
+ address_2.user = user
+ # Create a third address.
+ address_3 = self._user_manager.create_address(
+ 'anne3@example.com', 'Anne User 3')
+ address_3.verified_on = now()
+ address_3.user = user
+ # Create three lists.
+ mlist1 = create_list('test1@example.com')
+ mlist1.admin_immed_notify = False
+ mlist2 = create_list('test2@example.com')
+ mlist2.admin_immed_notify = False
+ mlist3 = create_list('test3@example.com')
+ mlist3.admin_immed_notify = False
+ # Subscribe the addresses in random order
+ # https://www.xkcd.com/221/
+ mlist3.subscribe(address_3, MemberRole.moderator)
+ mlist3.subscribe(address_3, MemberRole.owner)
+ mlist3.subscribe(address_3, MemberRole.member)
+ mlist3.subscribe(address_2, MemberRole.member)
+ mlist3.subscribe(address_2, MemberRole.owner)
+ mlist3.subscribe(address_2, MemberRole.moderator)
+ mlist3.subscribe(address, MemberRole.owner)
+ mlist3.subscribe(address, MemberRole.member)
+ mlist3.subscribe(address, MemberRole.moderator)
+ mlist2.subscribe(address_2, MemberRole.moderator)
+ mlist2.subscribe(address_2, MemberRole.member)
+ mlist2.subscribe(address_2, MemberRole.owner)
+ mlist2.subscribe(address_3, MemberRole.moderator)
+ mlist2.subscribe(address_3, MemberRole.member)
+ mlist2.subscribe(address_3, MemberRole.owner)
+ mlist2.subscribe(address, MemberRole.owner)
+ mlist2.subscribe(address, MemberRole.moderator)
+ mlist2.subscribe(address, MemberRole.member)
+ mlist1.subscribe(address_2, MemberRole.moderator)
+ mlist1.subscribe(address, MemberRole.member)
+ mlist1.subscribe(address_3, MemberRole.owner)
+ # The results should be sorted first by list id, then by address, then
+ # by member role.
+ members = self._service.find_members(user.user_id)
+ self.assertEqual(len(members), 21)
+ self.assertListEqual(
+ [(m.list_id.partition('.')[0],
+ m.address.email.partition('@')[0],
+ m.role)
+ for m in members],
+ [('test1', 'anne1', MemberRole.member),
+ ('test1', 'anne2', MemberRole.moderator),
+ ('test1', 'anne3', MemberRole.owner),
+ ('test2', 'anne1', MemberRole.member),
+ ('test2', 'anne1', MemberRole.owner),
+ ('test2', 'anne1', MemberRole.moderator),
+ ('test2', 'anne2', MemberRole.member),
+ ('test2', 'anne2', MemberRole.owner),
+ ('test2', 'anne2', MemberRole.moderator),
+ ('test2', 'anne3', MemberRole.member),
+ ('test2', 'anne3', MemberRole.owner),
+ ('test2', 'anne3', MemberRole.moderator),
+ ('test3', 'anne1', MemberRole.member),
+ ('test3', 'anne1', MemberRole.owner),
+ ('test3', 'anne1', MemberRole.moderator),
+ ('test3', 'anne2', MemberRole.member),
+ ('test3', 'anne2', MemberRole.owner),
+ ('test3', 'anne2', MemberRole.moderator),
+ ('test3', 'anne3', MemberRole.member),
+ ('test3', 'anne3', MemberRole.owner),
+ ('test3', 'anne3', MemberRole.moderator),
+ ])