# Copyright (C) 2017 Jan Jancar
#
# This file is a part of the Mailman PGP plugin.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see .
"""REST interface to a PGP enabled mailing list."""
from lazr.config import as_boolean
from mailman.interfaces.action import Action
from mailman.interfaces.listmanager import IListManager
from mailman.interfaces.member import MemberRole
from mailman.rest.helpers import (accepted, bad_request,
child, ChildError, CollectionMixin, etag,
GetterSetter, no_content, not_found,
NotFound, okay)
from mailman.rest.validator import (enum_validator, PatchValidator,
UnknownPATCHRequestError, Validator)
from pgpy.errors import PGPError
from public import public
from zope.component import getUtility
from mailman_pgp.config import config
from mailman_pgp.database import transaction
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.utils.pgp import key_from_blob
from mailman_pgp.utils.rest import enumflag_validator, workflow_validator
from mailman_pgp.workflows.key_change import (KeyChangeModWorkflow,
KeyChangeWorkflow)
CONFIGURATION = dict(
unsigned_msg_action=GetterSetter(enum_validator(Action)),
inline_pgp_action=GetterSetter(enum_validator(Action)),
expired_sig_action=GetterSetter(enum_validator(Action)),
revoked_sig_action=GetterSetter(enum_validator(Action)),
invalid_sig_action=GetterSetter(enum_validator(Action)),
duplicate_sig_action=GetterSetter(enum_validator(Action)),
strip_original_sig=GetterSetter(as_boolean),
sign_outgoing=GetterSetter(as_boolean),
nonencrypted_msg_action=GetterSetter(enum_validator(Action)),
encrypt_outgoing=GetterSetter(as_boolean),
key_change_workflow=GetterSetter(
workflow_validator(KeyChangeWorkflow, KeyChangeModWorkflow)),
key_signing_allowed=GetterSetter(enumflag_validator(MemberRole))
)
class _PGPListBase(CollectionMixin):
def _resource_as_dict(self, emlist):
"""See `CollectionMixin`."""
return dict(list_id=emlist.list_id,
unsigned_msg_action=emlist.unsigned_msg_action,
inline_pgp_action=emlist.inline_pgp_action,
expired_sig_action=emlist.expired_sig_action,
revoked_sig_action=emlist.revoked_sig_action,
invalid_sig_action=emlist.invalid_sig_action,
duplicate_sig_action=emlist.duplicate_sig_action,
strip_original_sig=emlist.strip_original_sig,
sign_outgoing=emlist.sign_outgoing,
nonencrypted_msg_action=emlist.nonencrypted_msg_action,
encrypt_outgoing=emlist.encrypt_outgoing,
self_link=self.api.path_to(
'/plugins/{}/lists/{}'.format(config.name,
emlist.list_id)))
def _get_collection(self, request):
"""See `CollectionMixin`."""
return PGPMailingList.query().all()
@public
class AllPGPLists(_PGPListBase):
"""The PGP enabled mailing lists."""
def on_get(self, request, response):
"""/lists"""
resource = self._make_collection(request)
return okay(response, etag(resource))
@public
class APGPList(_PGPListBase):
"""One PGP enabled mailing list."""
def __init__(self, list_identifier):
manager = getUtility(IListManager)
if '@' in list_identifier:
mlist = manager.get(list_identifier)
else:
mlist = manager.get_by_list_id(list_identifier)
self._mlist = PGPMailingList.for_list(mlist)
def on_get(self, request, response):
"""/lists/"""
if self._mlist is None:
not_found(response)
else:
okay(response, self._resource_as_json(self._mlist))
def on_put(self, request, response):
"""/lists/"""
if self._mlist is None:
not_found(response)
else:
validator = Validator(**CONFIGURATION)
try:
with transaction():
validator.update(self._mlist, request)
except ValueError as error:
bad_request(response, str(error))
else:
no_content(response)
def on_patch(self, request, response):
"""/lists/"""
if self._mlist is None:
not_found(response)
else:
try:
validator = PatchValidator(request, CONFIGURATION)
except UnknownPATCHRequestError as error:
bad_request(response,
'Unknown attribute: {}'.format(error.attribute))
return
try:
with transaction():
validator.update(self._mlist, request)
except ValueError as error:
bad_request(response, str(error))
else:
no_content(response)
@child()
def key(self, context, segments):
if self._mlist is None:
return NotFound(), []
if not config.get_value('rest', 'expose_private_key'):
return ChildError(403), []
return AListKey(self._mlist), []
@child()
def pubkey(self, context, segments):
if self._mlist is None:
return NotFound(), []
return AListPubkey(self._mlist), []
@public
class AListKey:
"""A PGP private key."""
def __init__(self, mlist):
self._mlist = mlist
def on_get(self, request, response):
"""/lists//key"""
key = self._mlist.key
if key is None:
not_found(response)
else:
resource = dict(key=str(key),
key_fingerprint=str(key.fingerprint))
okay(response, etag(resource))
def on_put(self, request, response):
"""/lists//key"""
try:
validator = Validator(key=GetterSetter(key_from_blob))
values = validator(request)
except (ValueError, PGPError) as error:
bad_request(response, str(error))
return
with transaction():
self._mlist.key = values.pop('key')
accepted(response)
@public
class AListPubkey:
"""A PGP list public key."""
def __init__(self, mlist):
self._mlist = mlist
def on_get(self, request, response):
"""/lists//pubkey"""
pubkey = self._mlist.pubkey
if pubkey is None:
not_found(response)
else:
resource = dict(public_key=str(pubkey),
key_fingerprint=str(pubkey.fingerprint))
okay(response, etag(resource))