diff options
Diffstat (limited to 'src/mailman/rest/users.py')
| -rw-r--r-- | src/mailman/rest/users.py | 107 |
1 files changed, 93 insertions, 14 deletions
diff --git a/src/mailman/rest/users.py b/src/mailman/rest/users.py index 6856798d2..7b1ec8040 100644 --- a/src/mailman/rest/users.py +++ b/src/mailman/rest/users.py @@ -22,6 +22,7 @@ __all__ = [ 'AddressUser', 'AllUsers', 'Login', + 'OwnersForDomain', ] @@ -37,7 +38,8 @@ from mailman.rest.helpers import ( conflict, created, etag, forbidden, no_content, not_found, okay, paginate, path_to) from mailman.rest.preferences import Preferences -from mailman.rest.validator import PatchValidator, Validator +from mailman.rest.validator import ( + PatchValidator, Validator, list_of_strings_validator) from passlib.utils import generate_password as generate from uuid import UUID from zope.component import getUtility @@ -47,27 +49,42 @@ from zope.component import getUtility # Attributes of a user which can be changed via the REST API. class PasswordEncrypterGetterSetter(GetterSetter): def __init__(self): - super(PasswordEncrypterGetterSetter, self).__init__( - config.password_context.encrypt) + super().__init__(config.password_context.encrypt) def get(self, obj, attribute): assert attribute == 'cleartext_password' - super(PasswordEncrypterGetterSetter, self).get(obj, 'password') + super().get(obj, 'password') def put(self, obj, attribute, value): assert attribute == 'cleartext_password' - super(PasswordEncrypterGetterSetter, self).put(obj, 'password', value) + super().put(obj, 'password', value) + + +class ListOfDomainOwners(GetterSetter): + def get(self, domain, attribute): + assert attribute == 'owner', ( + 'Unexpected attribute: {}'.format(attribute)) + def sort_key(owner): + return owner.addresses[0].email + return sorted(domain.owners, key=sort_key) + + def put(self, domain, attribute, value): + assert attribute == 'owner', ( + 'Unexpected attribute: {}'.format(attribute)) + domain.add_owners(value) ATTRIBUTES = dict( - display_name=GetterSetter(str), cleartext_password=PasswordEncrypterGetterSetter(), + display_name=GetterSetter(str), + is_server_owner=GetterSetter(as_boolean), ) CREATION_FIELDS = dict( - email=str, display_name=str, + email=str, + is_server_owner=bool, password=str, - _optional=('display_name', 'password'), + _optional=('display_name', 'password', 'is_server_owner'), ) @@ -78,6 +95,7 @@ def create_user(arguments, response): # strip that out (if it exists), then create the user, adding the password # after the fact if successful. password = arguments.pop('password', None) + is_server_owner = arguments.pop('is_server_owner', False) try: user = getUtility(IUserManager).create_user(**arguments) except ExistingAddressError as error: @@ -88,6 +106,7 @@ def create_user(arguments, response): # This will have to be reset since it cannot be retrieved. password = generate(int(config.passwords.password_length)) user.password = config.password_context.encrypt(password) + user.is_server_owner = is_server_owner location = path_to('users/{}'.format(user.user_id.int)) created(response, location) return user @@ -105,10 +124,11 @@ class _UserBase(CollectionMixin): # but we serialize its integer equivalent. user_id = user.user_id.int resource = dict( - user_id=user_id, created_on=user.created_on, + is_server_owner=user.is_server_owner, self_link=path_to('users/{}'.format(user_id)), - ) + user_id=user_id, + ) # Add the password attribute, only if the user has a password. Same # with the real name. These could be None or the empty string. if user.password: @@ -185,14 +205,18 @@ class AUser(_UserBase): return UserAddresses(self._user) def on_delete(self, request, response): - """Delete the named user, all her memberships, and addresses.""" + """Delete the named user and all associated resources.""" if self._user is None: not_found(response) return for member in self._user.memberships.members: member.unsubscribe() user_manager = getUtility(IUserManager) - for address in self._user.addresses: + # SQLAlchemy is susceptable to delete-elements-while-iterating bugs so + # first figure out all the addresses we want to delete, then in a + # separate pass, delete those addresses. (See LP: #1419519) + delete = list(self._user.addresses) + for address in delete: user_manager.delete_address(address) user_manager.delete_user(self._user) no_content(response) @@ -289,7 +313,8 @@ class AddressUser(_UserBase): del fields['email'] fields['user_id'] = int fields['auto_create'] = as_boolean - fields['_optional'] = fields['_optional'] + ('user_id', 'auto_create') + fields['_optional'] = fields['_optional'] + ( + 'user_id', 'auto_create', 'is_server_owner') try: validator = Validator(**fields) arguments = validator(request) @@ -324,7 +349,8 @@ class AddressUser(_UserBase): # Process post data and check for an existing user. fields = CREATION_FIELDS.copy() fields['user_id'] = int - fields['_optional'] = fields['_optional'] + ('user_id', 'email') + fields['_optional'] = fields['_optional'] + ( + 'user_id', 'email', 'is_server_owner') try: validator = Validator(**fields) arguments = validator(request) @@ -373,3 +399,56 @@ class Login: no_content(response) else: forbidden(response) + + + +class OwnersForDomain(_UserBase): + """Owners for a particular domain.""" + + def __init__(self, domain): + self._domain = domain + + def on_get(self, request, response): + """/domains/<domain>/owners""" + if self._domain is None: + not_found(response) + return + resource = self._make_collection(request) + okay(response, etag(resource)) + + def on_post(self, request, response): + """POST to /domains/<domain>/owners """ + if self._domain is None: + not_found(response) + return + validator = Validator( + owner=ListOfDomainOwners(list_of_strings_validator)) + try: + validator.update(self._domain, request) + except ValueError as error: + bad_request(response, str(error)) + return + return no_content(response) + + def on_delete(self, request, response): + """DELETE to /domains/<domain>/owners""" + if self._domain is None: + not_found(response) + try: + # No arguments. + Validator()(request) + except ValueError as error: + bad_request(response, str(error)) + return + owner_email = [ + owner.addresses[0].email + for owner in self._domain.owners + ] + for email in owner_email: + self._domain.remove_owner(email) + return no_content(response) + + @paginate + def _get_collection(self, request): + """See `CollectionMixin`.""" + return list(self._domain.owners) |
