summaryrefslogtreecommitdiff
path: root/src/mailman/model/subscriptions.py
blob: 3d2bcbeb061a8653cf47fa8a4a3d47df86f8b2af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Copyright (C) 2016 by the Free Software Foundation, Inc.
#
# This file is part of GNU Mailman.
#
# GNU Mailman is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# GNU Mailman.  If not, see <http://www.gnu.org/licenses/>.

"""Subscription services."""

from mailman import public
from mailman.app.membership import delete_member
from mailman.database.transaction import dbconnection
from mailman.interfaces.listmanager import IListManager, NoSuchListError
from mailman.interfaces.subscriptions import (
    ISubscriptionService, TooManyMembersError)
from mailman.interfaces.usermanager import IUserManager
from mailman.model.address import Address
from mailman.model.member import Member
from mailman.model.user import User
from mailman.utilities.queries import QuerySequence
from operator import attrgetter
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
from zope.component import getUtility
from zope.interface import implementer


@public
@implementer(ISubscriptionService)
class SubscriptionService:
    """Subscription services for the REST API."""

    __name__ = 'members'

    def get_members(self):
        """See `ISubscriptionService`."""
        # {list_id -> {role -> [members]}}
        by_list = {}
        user_manager = getUtility(IUserManager)
        for member in user_manager.members:
            by_role = by_list.setdefault(member.list_id, {})
            members = by_role.setdefault(member.role.name, [])
            members.append(member)
        # Flatten into single list sorted as per the interface.
        all_members = []
        address_of_member = attrgetter('address.email')
        for list_id in sorted(by_list):
            by_role = by_list[list_id]
            all_members.extend(
                sorted(by_role.get('owner', []), key=address_of_member))
            all_members.extend(
                sorted(by_role.get('moderator', []), key=address_of_member))
            all_members.extend(
                sorted(by_role.get('member', []), key=address_of_member))
        return all_members

    @dbconnection
    def get_member(self, store, member_id):
        """See `ISubscriptionService`."""
        members = store.query(Member).filter(Member._member_id == member_id)
        if members.count() == 0:
            return None
        else:
            assert members.count() == 1, 'Too many matching members'
            return members[0]

    @dbconnection
    def _find_members(self, store, subscriber, list_id, role):
        # If `subscriber` is a user id, then we'll search for all addresses
        # which are controlled by the user, otherwise we'll just search for
        # the given address.
        if subscriber is None and list_id is None and role is None:
            return None
        order = (Member.list_id, Address.email, Member.role)
        # Querying for the subscriber is the most complicated part, because
        # the parameter can either be an email address or a user id.  Start by
        # building two queries, one joined on the member's address, and one
        # joined on the member's user.  Add the resulting email address to the
        # selected values to be able to sort on it later on.
        q_address = store.query(Member, Address.email).join(Member._address)
        q_user = store.query(Member, Address.email).join(Member._user)
        if subscriber is not None:
            if isinstance(subscriber, str):
                # subscriber is an email address.
                subscriber = subscriber.lower()
                if '*' in subscriber:
                    subscriber = subscriber.replace('*', '%')
                    q_address = q_address.filter(
                        Address.email.like(subscriber))
                    q_user = q_user.join(User.addresses).filter(
                        Address.email.like(subscriber))
                else:
                    q_address = q_address.filter(
                        Address.email == subscriber)
                    q_user = q_user.join(User.addresses).filter(
                        Address.email == subscriber)
            else:
                # subscriber is a user id.
                q_address = q_address.join(Address.user).filter(
                    User._user_id == subscriber)
                q_user = q_user.join(User._preferred_address).filter(
                    User._user_id == subscriber)
        # Add additional filters to both queries.
        if list_id is not None:
            q_address = q_address.filter(Member.list_id == list_id)
            q_user = q_user.filter(Member.list_id == list_id)
        if role is not None:
            q_address = q_address.filter(Member.role == role)
            q_user = q_user.filter(Member.role == role)
        # Do a UNION of the two queries, sort the result and generate Members.
        return q_address.union(q_user).order_by(*order).from_self(Member)

    def find_members(self, subscriber=None, list_id=None, role=None):
        """See `ISubscriptionService`."""
        return QuerySequence(self._find_members(subscriber, list_id, role))

    def find_member(self, subscriber=None, list_id=None, role=None):
        """See `ISubscriptionService`."""
        try:
            result = self._find_members(subscriber, list_id, role)
            return (result if result is None else result.one())
        except NoResultFound:
            return None
        except MultipleResultsFound:
            # Coerce the exception into a Mailman-layer exception so call
            # sites don't have to import from SQLAlchemy, resulting in a layer
            # violation.
            raise TooManyMembersError(subscriber, list_id, role)

    def __iter__(self):
        yield from self.get_members()

    def leave(self, list_id, email):
        """See `ISubscriptionService`."""
        mlist = getUtility(IListManager).get_by_list_id(list_id)
        if mlist is None:
            raise NoSuchListError(list_id)
        # XXX for now, no notification or user acknowledgment.
        delete_member(mlist, email, False, False)