# Copyright (C) 2011-2015 by the Free Software Foundation, Inc.
#
# This file is part of GNU Mailman.
#
# GNU Mailman is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# GNU Mailman. If not, see .
"""REST for users."""
__all__ = [
'AUser',
'AddressUser',
'AllUsers',
'Login',
'OwnersForDomain',
]
from lazr.config import as_boolean
from mailman.config import config
from mailman.core.errors import (
ReadOnlyPATCHRequestError, UnknownPATCHRequestError)
from mailman.interfaces.address import ExistingAddressError
from mailman.interfaces.usermanager import IUserManager
from mailman.rest.addresses import UserAddresses
from mailman.rest.helpers import (
BadRequest, CollectionMixin, GetterSetter, NotFound, bad_request, child,
conflict, created, etag, forbidden, no_content, not_found, okay, paginate,
path_to)
from mailman.rest.preferences import Preferences
from mailman.rest.validator import (
PatchValidator, Validator, list_of_strings_validator)
from passlib.utils import generate_password as generate
from uuid import UUID
from zope.component import getUtility
# Attributes of a user which can be changed via the REST API.
class PasswordEncrypterGetterSetter(GetterSetter):
def __init__(self):
super().__init__(config.password_context.encrypt)
def get(self, obj, attribute):
assert attribute == 'cleartext_password'
super().get(obj, 'password')
def put(self, obj, attribute, value):
assert attribute == 'cleartext_password'
super().put(obj, 'password', value)
class ListOfDomainOwners(GetterSetter):
def get(self, domain, attribute):
assert attribute == 'owner', (
'Unexpected attribute: {}'.format(attribute))
def sort_key(owner):
return owner.addresses[0].email
return sorted(domain.owners, key=sort_key)
def put(self, domain, attribute, value):
assert attribute == 'owner', (
'Unexpected attribute: {}'.format(attribute))
domain.add_owners(value)
ATTRIBUTES = dict(
cleartext_password=PasswordEncrypterGetterSetter(),
display_name=GetterSetter(str),
is_server_owner=GetterSetter(as_boolean),
)
CREATION_FIELDS = dict(
display_name=str,
email=str,
is_server_owner=as_boolean,
password=str,
_optional=('display_name', 'password', 'is_server_owner'),
)
def create_user(arguments, request, response):
"""Create a new user."""
# We can't pass the 'password' argument to the user creation method, so
# strip that out (if it exists), then create the user, adding the password
# after the fact if successful.
password = arguments.pop('password', None)
is_server_owner = arguments.pop('is_server_owner', False)
user_manager = getUtility(IUserManager)
try:
user = user_manager.create_user(**arguments)
except ExistingAddressError as error:
# The address already exists. If the address already has a user
# linked to it, raise an error, otherwise create a new user and link
# it to this address.
email = arguments.pop('email')
user = user_manager.get_user(email)
if user is None:
address = user_manager.get_address(email)
user = user_manager.create_user(**arguments)
user.link(address)
else:
bad_request(
response, 'User already exists: {}'.format(error.address))
return None
if password is None:
# This will have to be reset since it cannot be retrieved.
password = generate(int(config.passwords.password_length))
user.password = config.password_context.encrypt(password)
user.is_server_owner = is_server_owner
location = path_to('users/{}'.format(user.user_id.int),
request.context['api_version'])
created(response, location)
return user
class _UserBase(CollectionMixin):
"""Shared base class for user representations."""
def _resource_as_dict(self, user):
"""See `CollectionMixin`."""
# The canonical URL for a user is their unique user id, although we
# can always look up a user based on any registered and validated
# email address associated with their account. The user id is a UUID,
# but we serialize its integer equivalent.
user_id = user.user_id.int
resource = dict(
created_on=user.created_on,
is_server_owner=user.is_server_owner,
self_link=self.path_to('users/{}'.format(user_id)),
user_id=user_id,
)
# Add the password attribute, only if the user has a password. Same
# with the real name. These could be None or the empty string.
if user.password:
resource['password'] = user.password
if user.display_name:
resource['display_name'] = user.display_name
return resource
@paginate
def _get_collection(self, request):
"""See `CollectionMixin`."""
return list(getUtility(IUserManager).users)
class AllUsers(_UserBase):
"""The users."""
def on_get(self, request, response):
"""/users"""
resource = self._make_collection(request)
okay(response, etag(resource))
def on_post(self, request, response):
"""Create a new user."""
try:
validator = Validator(**CREATION_FIELDS)
arguments = validator(request)
except ValueError as error:
bad_request(response, str(error))
return
create_user(arguments, request, response)
class AUser(_UserBase):
"""A user."""
def __init__(self, user_identifier):
"""Get a user by various type of identifiers.
:param user_identifier: The identifier used to retrieve the user. The
identifier may either be an integer user-id, or an email address
controlled by the user. The type of identifier is auto-detected
by looking for an `@` symbol, in which case it's taken as an email
address, otherwise it's assumed to be an integer.
:type user_identifier: string
"""
user_manager = getUtility(IUserManager)
if '@' in user_identifier:
self._user = user_manager.get_user(user_identifier)
else:
# The identifier is the string representation of an integer that
# must be converted to a UUID.
try:
user_id = UUID(int=int(user_identifier))
except ValueError:
self._user = None
else:
self._user = user_manager.get_user_by_id(user_id)
def on_get(self, request, response):
"""Return a single user end-point."""
if self._user is None:
not_found(response)
else:
okay(response, self._resource_as_json(self._user))
@child()
def addresses(self, request, segments):
"""/users//addresses"""
if self._user is None:
return NotFound(), []
return UserAddresses(self._user)
def on_delete(self, request, response):
"""Delete the named user and all associated resources."""
if self._user is None:
not_found(response)
return
for member in self._user.memberships.members:
member.unsubscribe()
user_manager = getUtility(IUserManager)
# SQLAlchemy is susceptable to delete-elements-while-iterating bugs so
# first figure out all the addresses we want to delete, then in a
# separate pass, delete those addresses. (See LP: #1419519)
delete = list(self._user.addresses)
for address in delete:
user_manager.delete_address(address)
user_manager.delete_user(self._user)
no_content(response)
@child()
def preferences(self, request, segments):
"""/addresses//preferences"""
if len(segments) != 0:
return BadRequest(), []
if self._user is None:
return NotFound(), []
child = Preferences(
self._user.preferences,
'users/{0}'.format(self._user.user_id.int))
return child, []
def on_patch(self, request, response):
"""Patch the user's configuration (i.e. partial update)."""
if self._user is None:
not_found(response)
return
try:
validator = PatchValidator(request, ATTRIBUTES)
except UnknownPATCHRequestError as error:
bad_request(
response, b'Unknown attribute: {0}'.format(error.attribute))
except ReadOnlyPATCHRequestError as error:
bad_request(
response, b'Read-only attribute: {0}'.format(error.attribute))
else:
validator.update(self._user, request)
no_content(response)
def on_put(self, request, response):
"""Put the user's configuration (i.e. full update)."""
if self._user is None:
not_found(response)
return
validator = Validator(**ATTRIBUTES)
try:
validator.update(self._user, request)
except UnknownPATCHRequestError as error:
bad_request(
response, b'Unknown attribute: {0}'.format(error.attribute))
except ReadOnlyPATCHRequestError as error:
bad_request(
response, b'Read-only attribute: {0}'.format(error.attribute))
except ValueError as error:
bad_request(response, str(error))
else:
no_content(response)
@child()
def login(self, request, segments):
"""Log the user in, sort of, by verifying a given password."""
if self._user is None:
return NotFound(), []
return Login(self._user)
class AddressUser(_UserBase):
"""The user linked to an address."""
def __init__(self, address):
self._address = address
self._user = address.user
def on_get(self, request, response):
"""Return a single user end-point."""
if self._user is None:
not_found(response)
else:
okay(response, self._resource_as_json(self._user))
def on_delete(self, request, response):
"""Delete the named user, all her memberships, and addresses."""
if self._user is None:
not_found(response)
return
self._user.unlink(self._address)
no_content(response)
def on_post(self, request, response):
"""Link a user to the address, and create it if needed."""
if self._user:
conflict(response)
return
# When creating a linked user by POSTing, the user either must already
# exist, or it can be automatically created, if the auto_create flag
# is given and true (if missing, it defaults to true). However, in
# this case we do not accept 'email' as a POST field.
fields = CREATION_FIELDS.copy()
del fields['email']
fields['user_id'] = int
fields['auto_create'] = as_boolean
fields['_optional'] = fields['_optional'] + (
'user_id', 'auto_create', 'is_server_owner')
try:
validator = Validator(**fields)
arguments = validator(request)
except ValueError as error:
bad_request(response, str(error))
return
user_manager = getUtility(IUserManager)
if 'user_id' in arguments:
raw_uid = arguments['user_id']
user_id = UUID(int=raw_uid)
user = user_manager.get_user_by_id(user_id)
if user is None:
not_found(response, b'No user with ID {}'.format(raw_uid))
return
okay(response)
else:
auto_create = arguments.pop('auto_create', True)
if auto_create:
# This sets the 201 or 400 status.
user = create_user(arguments, request, response)
if user is None:
return
else:
forbidden(response)
return
user.link(self._address)
def on_put(self, request, response):
"""Set or replace the addresses's user."""
if self._user:
self._user.unlink(self._address)
# Process post data and check for an existing user.
fields = CREATION_FIELDS.copy()
fields['user_id'] = int
fields['_optional'] = fields['_optional'] + (
'user_id', 'email', 'is_server_owner')
try:
validator = Validator(**fields)
arguments = validator(request)
except ValueError as error:
bad_request(response, str(error))
return
user_manager = getUtility(IUserManager)
if 'user_id' in arguments:
raw_uid = arguments['user_id']
user_id = UUID(int=raw_uid)
user = user_manager.get_user_by_id(user_id)
if user is None:
not_found(response, b'No user with ID {}'.format(raw_uid))
return
okay(response)
else:
user = create_user(arguments, request, response)
if user is None:
return
user.link(self._address)
class Login:
"""/users//login"""
def __init__(self, user):
assert user is not None
self._user = user
def on_post(self, request, response):
# We do not want to encrypt the plaintext password given in the POST
# data. That would hash the password, but we need to have the
# plaintext in order to pass into passlib.
validator = Validator(cleartext_password=GetterSetter(str))
try:
values = validator(request)
except ValueError as error:
bad_request(response, str(error))
return
is_valid, new_hash = config.password_context.verify(
values['cleartext_password'], self._user.password)
if is_valid:
if new_hash is not None:
self._user.password = new_hash
no_content(response)
else:
forbidden(response)
class OwnersForDomain(_UserBase):
"""Owners for a particular domain."""
def __init__(self, domain):
self._domain = domain
def on_get(self, request, response):
"""/domains//owners"""
if self._domain is None:
not_found(response)
return
resource = self._make_collection(request)
okay(response, etag(resource))
def on_post(self, request, response):
"""POST to /domains//owners """
if self._domain is None:
not_found(response)
return
validator = Validator(
owner=ListOfDomainOwners(list_of_strings_validator))
try:
validator.update(self._domain, request)
except ValueError as error:
bad_request(response, str(error))
return
return no_content(response)
def on_delete(self, request, response):
"""DELETE to /domains//owners"""
if self._domain is None:
not_found(response)
try:
# No arguments.
Validator()(request)
except ValueError as error:
bad_request(response, str(error))
return
owner_email = [
owner.addresses[0].email
for owner in self._domain.owners
]
for email in owner_email:
self._domain.remove_owner(email)
return no_content(response)
@paginate
def _get_collection(self, request):
"""See `CollectionMixin`."""
return list(self._domain.owners)