summaryrefslogtreecommitdiff
path: root/src/mailman/rest/users.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman/rest/users.py')
-rw-r--r--src/mailman/rest/users.py107
1 files changed, 93 insertions, 14 deletions
diff --git a/src/mailman/rest/users.py b/src/mailman/rest/users.py
index 6856798d2..7b1ec8040 100644
--- a/src/mailman/rest/users.py
+++ b/src/mailman/rest/users.py
@@ -22,6 +22,7 @@ __all__ = [
'AddressUser',
'AllUsers',
'Login',
+ 'OwnersForDomain',
]
@@ -37,7 +38,8 @@ from mailman.rest.helpers import (
conflict, created, etag, forbidden, no_content, not_found, okay, paginate,
path_to)
from mailman.rest.preferences import Preferences
-from mailman.rest.validator import PatchValidator, Validator
+from mailman.rest.validator import (
+ PatchValidator, Validator, list_of_strings_validator)
from passlib.utils import generate_password as generate
from uuid import UUID
from zope.component import getUtility
@@ -47,27 +49,42 @@ from zope.component import getUtility
# Attributes of a user which can be changed via the REST API.
class PasswordEncrypterGetterSetter(GetterSetter):
def __init__(self):
- super(PasswordEncrypterGetterSetter, self).__init__(
- config.password_context.encrypt)
+ super().__init__(config.password_context.encrypt)
def get(self, obj, attribute):
assert attribute == 'cleartext_password'
- super(PasswordEncrypterGetterSetter, self).get(obj, 'password')
+ super().get(obj, 'password')
def put(self, obj, attribute, value):
assert attribute == 'cleartext_password'
- super(PasswordEncrypterGetterSetter, self).put(obj, 'password', value)
+ super().put(obj, 'password', value)
+
+
+class ListOfDomainOwners(GetterSetter):
+ def get(self, domain, attribute):
+ assert attribute == 'owner', (
+ 'Unexpected attribute: {}'.format(attribute))
+ def sort_key(owner):
+ return owner.addresses[0].email
+ return sorted(domain.owners, key=sort_key)
+
+ def put(self, domain, attribute, value):
+ assert attribute == 'owner', (
+ 'Unexpected attribute: {}'.format(attribute))
+ domain.add_owners(value)
ATTRIBUTES = dict(
- display_name=GetterSetter(str),
cleartext_password=PasswordEncrypterGetterSetter(),
+ display_name=GetterSetter(str),
+ is_server_owner=GetterSetter(as_boolean),
)
CREATION_FIELDS = dict(
- email=str,
display_name=str,
+ email=str,
+ is_server_owner=bool,
password=str,
- _optional=('display_name', 'password'),
+ _optional=('display_name', 'password', 'is_server_owner'),
)
@@ -78,6 +95,7 @@ def create_user(arguments, response):
# strip that out (if it exists), then create the user, adding the password
# after the fact if successful.
password = arguments.pop('password', None)
+ is_server_owner = arguments.pop('is_server_owner', False)
try:
user = getUtility(IUserManager).create_user(**arguments)
except ExistingAddressError as error:
@@ -88,6 +106,7 @@ def create_user(arguments, response):
# This will have to be reset since it cannot be retrieved.
password = generate(int(config.passwords.password_length))
user.password = config.password_context.encrypt(password)
+ user.is_server_owner = is_server_owner
location = path_to('users/{}'.format(user.user_id.int))
created(response, location)
return user
@@ -105,10 +124,11 @@ class _UserBase(CollectionMixin):
# but we serialize its integer equivalent.
user_id = user.user_id.int
resource = dict(
- user_id=user_id,
created_on=user.created_on,
+ is_server_owner=user.is_server_owner,
self_link=path_to('users/{}'.format(user_id)),
- )
+ user_id=user_id,
+ )
# Add the password attribute, only if the user has a password. Same
# with the real name. These could be None or the empty string.
if user.password:
@@ -185,14 +205,18 @@ class AUser(_UserBase):
return UserAddresses(self._user)
def on_delete(self, request, response):
- """Delete the named user, all her memberships, and addresses."""
+ """Delete the named user and all associated resources."""
if self._user is None:
not_found(response)
return
for member in self._user.memberships.members:
member.unsubscribe()
user_manager = getUtility(IUserManager)
- for address in self._user.addresses:
+ # SQLAlchemy is susceptable to delete-elements-while-iterating bugs so
+ # first figure out all the addresses we want to delete, then in a
+ # separate pass, delete those addresses. (See LP: #1419519)
+ delete = list(self._user.addresses)
+ for address in delete:
user_manager.delete_address(address)
user_manager.delete_user(self._user)
no_content(response)
@@ -289,7 +313,8 @@ class AddressUser(_UserBase):
del fields['email']
fields['user_id'] = int
fields['auto_create'] = as_boolean
- fields['_optional'] = fields['_optional'] + ('user_id', 'auto_create')
+ fields['_optional'] = fields['_optional'] + (
+ 'user_id', 'auto_create', 'is_server_owner')
try:
validator = Validator(**fields)
arguments = validator(request)
@@ -324,7 +349,8 @@ class AddressUser(_UserBase):
# Process post data and check for an existing user.
fields = CREATION_FIELDS.copy()
fields['user_id'] = int
- fields['_optional'] = fields['_optional'] + ('user_id', 'email')
+ fields['_optional'] = fields['_optional'] + (
+ 'user_id', 'email', 'is_server_owner')
try:
validator = Validator(**fields)
arguments = validator(request)
@@ -373,3 +399,56 @@ class Login:
no_content(response)
else:
forbidden(response)
+
+
+
+class OwnersForDomain(_UserBase):
+ """Owners for a particular domain."""
+
+ def __init__(self, domain):
+ self._domain = domain
+
+ def on_get(self, request, response):
+ """/domains/<domain>/owners"""
+ if self._domain is None:
+ not_found(response)
+ return
+ resource = self._make_collection(request)
+ okay(response, etag(resource))
+
+ def on_post(self, request, response):
+ """POST to /domains/<domain>/owners """
+ if self._domain is None:
+ not_found(response)
+ return
+ validator = Validator(
+ owner=ListOfDomainOwners(list_of_strings_validator))
+ try:
+ validator.update(self._domain, request)
+ except ValueError as error:
+ bad_request(response, str(error))
+ return
+ return no_content(response)
+
+ def on_delete(self, request, response):
+ """DELETE to /domains/<domain>/owners"""
+ if self._domain is None:
+ not_found(response)
+ try:
+ # No arguments.
+ Validator()(request)
+ except ValueError as error:
+ bad_request(response, str(error))
+ return
+ owner_email = [
+ owner.addresses[0].email
+ for owner in self._domain.owners
+ ]
+ for email in owner_email:
+ self._domain.remove_owner(email)
+ return no_content(response)
+
+ @paginate
+ def _get_collection(self, request):
+ """See `CollectionMixin`."""
+ return list(self._domain.owners)