diff options
| author | J08nY | 2017-07-21 18:39:21 +0200 |
|---|---|---|
| committer | J08nY | 2017-07-21 18:39:21 +0200 |
| commit | 276c82849f61bd13b0fb2b38a028d58d87e830ed (patch) | |
| tree | 1e5b3a0ae5ee6ef2244cc1794db252f00f3ddf19 /src/mailman_pgp/rest/lists.py | |
| parent | 389eb09b8af9d1482cb140e52b53e13d97ce78ae (diff) | |
| download | mailman-pgp-276c82849f61bd13b0fb2b38a028d58d87e830ed.tar.gz mailman-pgp-276c82849f61bd13b0fb2b38a028d58d87e830ed.tar.zst mailman-pgp-276c82849f61bd13b0fb2b38a028d58d87e830ed.zip | |
Diffstat (limited to 'src/mailman_pgp/rest/lists.py')
| -rw-r--r-- | src/mailman_pgp/rest/lists.py | 131 |
1 files changed, 117 insertions, 14 deletions
diff --git a/src/mailman_pgp/rest/lists.py b/src/mailman_pgp/rest/lists.py index 37b8d28..1998f91 100644 --- a/src/mailman_pgp/rest/lists.py +++ b/src/mailman_pgp/rest/lists.py @@ -15,18 +15,39 @@ # You should have received a copy of the GNU General Public License along with # this program. If not, see <http://www.gnu.org/licenses/>. -"""""" +"""REST interface to a PGP enabled mailing list.""" +from lazr.config import as_boolean +from mailman.interfaces.action import Action from mailman.interfaces.listmanager import IListManager -from mailman.rest.helpers import ( - child, CollectionMixin, etag, not_found, okay) +from mailman.rest.helpers import (accepted, bad_request, + child, CollectionMixin, etag, GetterSetter, + no_content, not_found, NotFound, okay) +from mailman.rest.validator import (enum_validator, PatchValidator, + ReadOnlyPATCHRequestError, + UnknownPATCHRequestError, Validator) from public import public from zope.component import getUtility from mailman_pgp.config import config +from mailman_pgp.database import transaction from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.utils.pgp import key_from_blob +CONFIGURATION = dict( + unsigned_msg_action=GetterSetter(enum_validator(Action)), + inline_pgp_action=GetterSetter(enum_validator(Action)), + expired_sig_action=GetterSetter(enum_validator(Action)), + revoked_sig_action=GetterSetter(enum_validator(Action)), + invalid_sig_action=GetterSetter(enum_validator(Action)), + duplicate_sig_action=GetterSetter(enum_validator(Action)), + strip_original_sig=GetterSetter(as_boolean), + sign_outgoing=GetterSetter(as_boolean), + nonencrypted_msg_action=GetterSetter(enum_validator(Action)), + encrypt_outgoing=GetterSetter(as_boolean) +) -class _EncryptedBase(CollectionMixin): + +class _PGPListBase(CollectionMixin): def _resource_as_dict(self, emlist): """See `CollectionMixin`.""" return dict(list_id=emlist.list_id, @@ -35,9 +56,11 @@ class _EncryptedBase(CollectionMixin): expired_sig_action=emlist.expired_sig_action, revoked_sig_action=emlist.revoked_sig_action, invalid_sig_action=emlist.invalid_sig_action, + duplicate_sig_action=emlist.duplicate_sig_action, strip_original_sig=emlist.strip_original_sig, sign_outgoing=emlist.sign_outgoing, nonencrypted_msg_action=emlist.nonencrypted_msg_action, + encrypt_outgoing=emlist.encrypt_outgoing, self_link=self.api.path_to( '/plugins/{}/lists/{}'.format(config.name, emlist.list_id))) @@ -48,7 +71,9 @@ class _EncryptedBase(CollectionMixin): @public -class AllEncryptedLists(_EncryptedBase): +class AllPGPLists(_PGPListBase): + """The PGP enabled mailing lists.""" + def on_get(self, request, response): """/lists""" resource = self._make_collection(request) @@ -56,7 +81,9 @@ class AllEncryptedLists(_EncryptedBase): @public -class AnEncryptedList(_EncryptedBase): +class APGPList(_PGPListBase): + """One PGP enabled mailing list.""" + def __init__(self, list_identifier): manager = getUtility(IListManager) if '@' in list_identifier: @@ -68,29 +95,105 @@ class AnEncryptedList(_EncryptedBase): def on_get(self, request, response): """/lists/<list_id>""" if self._mlist is None: - return not_found(response) + not_found(response) else: okay(response, self._resource_as_json(self._mlist)) + def on_put(self, request, response): + """/lists/<list_id>""" + if self._mlist is None: + not_found(response) + else: + validator = Validator(**CONFIGURATION) + try: + with transaction(): + validator.update(self._mlist, request) + except ValueError as error: + bad_request(response, str(error)) + else: + no_content(response) + + def on_patch(self, request, response): + """/lists/<list_id>""" + if self._mlist is None: + not_found(response) + else: + try: + validator = PatchValidator(request, CONFIGURATION) + except UnknownPATCHRequestError as error: + bad_request( + response, + 'Unknown attribute: {}'.format(error.attribute)) + return + except ReadOnlyPATCHRequestError as error: + bad_request( + response, + 'Read-only attribute: {}'.format(error.attribute)) + return + try: + with transaction(): + validator.update(self._mlist, request) + except ValueError as error: + bad_request(response, str(error)) + else: + no_content(response) + + @child() + def key(self, context, segments): + if self._mlist is None: + return NotFound(), [] + return AListKey(self._mlist), [] + @child() def pubkey(self, context, segments): + if self._mlist is None: + return NotFound(), [] return AListPubkey(self._mlist), [] @public -class AListPubkey: +class AListKey: + """A PGP private key.""" + def __init__(self, mlist): self._mlist = mlist def on_get(self, request, response): """/lists/<list_id>/key""" - if self._mlist is None: - return not_found(response) + key = self._mlist.key + if key is None: + not_found(response) else: - pubkey = self._mlist.pubkey - if pubkey is None: - return not_found(response) + resource = dict(key=str(key), + key_fingerprint=str(key.fingerprint)) + okay(response, etag(resource)) + + def on_put(self, request, response): + """/lists/<list_id>/key""" + try: + validator = Validator(key=GetterSetter(key_from_blob)) + values = validator(request) + except ValueError as error: + bad_request(response, str(error)) + return + with transaction(): + self._mlist.key = values.pop('key') + accepted(response) + + +@public +class AListPubkey: + """A PGP list public key.""" + def __init__(self, mlist): + self._mlist = mlist + + def on_get(self, request, response): + """/lists/<list_id>/pubkey""" + pubkey = self._mlist.pubkey + if pubkey is None: + not_found(response) + else: resource = dict(public_key=str(pubkey), key_fingerprint=str(pubkey.fingerprint)) - return okay(response, etag(resource)) + okay(response, etag(resource)) |
