summaryrefslogtreecommitdiff
path: root/src/mailman/rest/validator.py
blob: d5f684df9cec0b58de365c428bdc7dfc10ffeba4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# Copyright (C) 2010-2017 by the Free Software Foundation, Inc.
#
# This file is part of GNU Mailman.
#
# GNU Mailman is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# GNU Mailman.  If not, see <http://www.gnu.org/licenses/>.

"""REST web form validation."""
from mailman.config import config
from mailman.interfaces.address import IEmailValidator
from mailman.interfaces.errors import MailmanError
from mailman.interfaces.languages import ILanguageManager
from mailman.interfaces.workflows import (ISubscriptionWorkflow,
                                          IUnsubscriptionWorkflow)
from mailman.workflows.subscription import (
    ConfirmModerationSubscriptionPolicy, ConfirmSubscriptionPolicy,
    ModerationSubscriptionPolicy, OpenSubscriptionPolicy)
from mailman.workflows.unsubscription import (
    ConfirmModerationUnsubscriptionPolicy, ConfirmUnsubscriptionPolicy,
    ModerationUnsubscriptionPolicy, OpenUnsubscriptionPolicy)
from public import public
from zope.component import getUtility


COMMASPACE = ', '
OLD_SUB_MAP = {
    'open': OpenSubscriptionPolicy,
    'confirm': ConfirmSubscriptionPolicy,
    'moderate': ModerationSubscriptionPolicy,
    'confirm_then_moderate': ConfirmModerationSubscriptionPolicy
    }
OLD_UNSUB_MAP = {
    'open': OpenUnsubscriptionPolicy,
    'confirm': ConfirmUnsubscriptionPolicy,
    'moderate': ModerationUnsubscriptionPolicy,
    'confirm_then_moderate': ConfirmModerationUnsubscriptionPolicy
    }


@public
class RESTError(MailmanError):
    """Base class for REST API errors."""


@public
class UnknownPATCHRequestError(RESTError):
    """A PATCH request contained an unknown attribute."""

    def __init__(self, attribute):
        self.attribute = attribute


@public
class ReadOnlyPATCHRequestError(RESTError):
    """A PATCH request contained a read-only attribute."""

    def __init__(self, attribute):
        self.attribute = attribute


@public
class enum_validator:
    """Convert an enum value name into an enum value."""

    def __init__(self, enum_class, *, allow_blank=False):
        self._enum_class = enum_class
        self._allow_blank = allow_blank

    def __call__(self, enum_value):
        # This will raise a KeyError if the enum value is unknown.  The
        # Validator API requires turning this into a ValueError.
        if not enum_value and self._allow_blank:
            return None
        try:
            return self._enum_class[enum_value]
        except KeyError as exception:
            # Retain the error message.
            raise ValueError(exception.args[0])


@public
def subscriber_validator(api):
    """Convert an email-or-(int|hex) to an email-or-UUID."""
    def _inner(subscriber):
        try:
            return api.to_uuid(subscriber)
        except ValueError:
            # It must be an email address.
            if getUtility(IEmailValidator).is_valid(subscriber):
                return subscriber
            raise ValueError
    return _inner


@public
def language_validator(code):
    """Convert a language code to a Language object."""
    return getUtility(ILanguageManager)[code]


@public
def list_of_strings_validator(values):
    """Turn a list of things, or a single thing, into a list of unicodes."""
    if not isinstance(values, (list, tuple)):
        values = [values]
    for value in values:
        if not isinstance(value, str):
            raise ValueError('Expected str, got {!r}'.format(value))
    return values


@public
class policy_validator:
    """"""

    def __init__(self, policy_interface):
        self._policy_interface = policy_interface
        if policy_interface is ISubscriptionWorkflow:
            self._old_map = OLD_SUB_MAP
        elif policy_interface is IUnsubscriptionWorkflow:
            self._old_map = OLD_UNSUB_MAP
        else:
            raise ValueError('Expected a workflow interface.')

    def __call__(self, policy):
        if self._policy_interface.implementedBy(policy):
            return policy
        if (policy in config.workflows and
                self._policy_interface.implementedBy(
                        config.workflows[policy])):
            return config.workflows[policy]
        # For backwards compatibility.
        policy = self._old_map.get(policy, None)
        if policy is not None:
            return policy
        raise ValueError('Unknown policy: {}'.format(policy))


@public
class Validator:
    """A validator of parameter input."""

    def __init__(self, **kws):
        if '_optional' in kws:
            self._optional = set(kws.pop('_optional'))
        else:
            self._optional = set()
        self._converters = kws.copy()

    def __call__(self, request):
        values = {}
        extras = set()
        cannot_convert = set()
        form_data = {}
        # All keys which show up only once in the form data get a scalar value
        # in the pre-converted dictionary.  All keys which show up more than
        # once get a list value.
        missing = object()
        items = request.params.items()
        for key, new_value in items:
            old_value = form_data.get(key, missing)
            if old_value is missing:
                form_data[key] = new_value
            elif isinstance(old_value, list):
                old_value.append(new_value)
            else:
                form_data[key] = [old_value, new_value]
        # Now do all the conversions.
        for key, value in form_data.items():
            try:
                values[key] = self._converters[key](value)
            except KeyError:
                extras.add(key)
            except (TypeError, ValueError):
                cannot_convert.add(key)
        # Make sure there are no unexpected values.
        if len(extras) != 0:
            extras = COMMASPACE.join(sorted(extras))
            raise ValueError('Unexpected parameters: {}'.format(extras))
        # Make sure everything could be converted.
        if len(cannot_convert) != 0:
            bad = COMMASPACE.join(sorted(cannot_convert))
            raise ValueError('Cannot convert parameters: {}'.format(bad))
        # Make sure nothing's missing.
        value_keys = set(values)
        required_keys = set(self._converters) - self._optional
        if value_keys & required_keys != required_keys:
            missing = COMMASPACE.join(sorted(required_keys - value_keys))
            raise ValueError('Missing parameters: {}'.format(missing))
        return values

    def update(self, obj, request):
        """Update the object with the values in the request.

        This first validates and converts the attributes in the request, then
        updates the given object with the newly converted values.

        :param obj: The object to update.
        :type obj: object
        :param request: The HTTP request.
        :raises ValueError: if conversion failed for some attribute, including
            if the API version mismatches.
        """
        for key, value in self.__call__(request).items():
            self._converters[key].put(obj, key, value)


@public
class PatchValidator(Validator):
    """Create a special validator for PATCH requests.

    PATCH is different than PUT because with the latter, you're changing the
    entire resource, so all expected attributes must exist.  With the former,
    you're only changing a subset of the attributes, so you only validate the
    ones that exist in the request.
    """
    def __init__(self, request, converters):
        """Create a validator for the PATCH request.

        :param request: The request object, which must have a .PATCH
            attribute.
        :param converters: A mapping of attribute names to the converter for
            that attribute's type.  Generally, this will be a GetterSetter
            instance, but it might be something more specific for custom data
            types (e.g. non-basic types like unicodes).
        :raises UnknownPATCHRequestError: if the request contains an unknown
            attribute, i.e. one that is not in the `attributes` mapping.
        :raises ReadOnlyPATCHRequest: if the requests contains an attribute
            that is defined as read-only.
        """
        validationators = {}
        for attribute in request.params:
            if attribute not in converters:
                raise UnknownPATCHRequestError(attribute)
            if converters[attribute].decoder is None:
                raise ReadOnlyPATCHRequestError(attribute)
            validationators[attribute] = converters[attribute]
        super().__init__(**validationators)