diff options
| author | J08nY | 2017-07-21 18:39:21 +0200 |
|---|---|---|
| committer | J08nY | 2017-07-21 18:39:21 +0200 |
| commit | 276c82849f61bd13b0fb2b38a028d58d87e830ed (patch) | |
| tree | 1e5b3a0ae5ee6ef2244cc1794db252f00f3ddf19 /src | |
| parent | 389eb09b8af9d1482cb140e52b53e13d97ce78ae (diff) | |
| download | mailman-pgp-276c82849f61bd13b0fb2b38a028d58d87e830ed.tar.gz mailman-pgp-276c82849f61bd13b0fb2b38a028d58d87e830ed.tar.zst mailman-pgp-276c82849f61bd13b0fb2b38a028d58d87e830ed.zip | |
Diffstat (limited to 'src')
| -rw-r--r-- | src/mailman_pgp/model/list.py | 8 | ||||
| -rw-r--r-- | src/mailman_pgp/rest/lists.py | 131 | ||||
| -rw-r--r-- | src/mailman_pgp/rest/root.py | 6 | ||||
| -rw-r--r-- | src/mailman_pgp/rest/tests/test_lists.py | 111 | ||||
| -rw-r--r-- | src/mailman_pgp/utils/pgp.py | 29 |
5 files changed, 260 insertions, 25 deletions
diff --git a/src/mailman_pgp/model/list.py b/src/mailman_pgp/model/list.py index 0163026..a0101a7 100644 --- a/src/mailman_pgp/model/list.py +++ b/src/mailman_pgp/model/list.py @@ -17,6 +17,7 @@ """Model for PGP enabled mailing lists.""" +from os import remove from os.path import exists, isfile, join from flufl.lock import Lock @@ -99,8 +100,11 @@ class PGPMailingList(Base): def key(self, value): with Lock(self.key_path + '.lock'): self._key = value - with open(self.key_path, 'w') as key_file: - key_file.write(str(value)) + if value is None: + remove(self.key_path) + else: + with open(self.key_path, 'w') as key_file: + key_file.write(str(value)) def generate_key(self, block=False): self._key = None diff --git a/src/mailman_pgp/rest/lists.py b/src/mailman_pgp/rest/lists.py index 37b8d28..1998f91 100644 --- a/src/mailman_pgp/rest/lists.py +++ b/src/mailman_pgp/rest/lists.py @@ -15,18 +15,39 @@ # You should have received a copy of the GNU General Public License along with # this program. If not, see <http://www.gnu.org/licenses/>. -"""""" +"""REST interface to a PGP enabled mailing list.""" +from lazr.config import as_boolean +from mailman.interfaces.action import Action from mailman.interfaces.listmanager import IListManager -from mailman.rest.helpers import ( - child, CollectionMixin, etag, not_found, okay) +from mailman.rest.helpers import (accepted, bad_request, + child, CollectionMixin, etag, GetterSetter, + no_content, not_found, NotFound, okay) +from mailman.rest.validator import (enum_validator, PatchValidator, + ReadOnlyPATCHRequestError, + UnknownPATCHRequestError, Validator) from public import public from zope.component import getUtility from mailman_pgp.config import config +from mailman_pgp.database import transaction from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.utils.pgp import key_from_blob +CONFIGURATION = dict( + unsigned_msg_action=GetterSetter(enum_validator(Action)), + inline_pgp_action=GetterSetter(enum_validator(Action)), + expired_sig_action=GetterSetter(enum_validator(Action)), + revoked_sig_action=GetterSetter(enum_validator(Action)), + invalid_sig_action=GetterSetter(enum_validator(Action)), + duplicate_sig_action=GetterSetter(enum_validator(Action)), + strip_original_sig=GetterSetter(as_boolean), + sign_outgoing=GetterSetter(as_boolean), + nonencrypted_msg_action=GetterSetter(enum_validator(Action)), + encrypt_outgoing=GetterSetter(as_boolean) +) -class _EncryptedBase(CollectionMixin): + +class _PGPListBase(CollectionMixin): def _resource_as_dict(self, emlist): """See `CollectionMixin`.""" return dict(list_id=emlist.list_id, @@ -35,9 +56,11 @@ class _EncryptedBase(CollectionMixin): expired_sig_action=emlist.expired_sig_action, revoked_sig_action=emlist.revoked_sig_action, invalid_sig_action=emlist.invalid_sig_action, + duplicate_sig_action=emlist.duplicate_sig_action, strip_original_sig=emlist.strip_original_sig, sign_outgoing=emlist.sign_outgoing, nonencrypted_msg_action=emlist.nonencrypted_msg_action, + encrypt_outgoing=emlist.encrypt_outgoing, self_link=self.api.path_to( '/plugins/{}/lists/{}'.format(config.name, emlist.list_id))) @@ -48,7 +71,9 @@ class _EncryptedBase(CollectionMixin): @public -class AllEncryptedLists(_EncryptedBase): +class AllPGPLists(_PGPListBase): + """The PGP enabled mailing lists.""" + def on_get(self, request, response): """/lists""" resource = self._make_collection(request) @@ -56,7 +81,9 @@ class AllEncryptedLists(_EncryptedBase): @public -class AnEncryptedList(_EncryptedBase): +class APGPList(_PGPListBase): + """One PGP enabled mailing list.""" + def __init__(self, list_identifier): manager = getUtility(IListManager) if '@' in list_identifier: @@ -68,29 +95,105 @@ class AnEncryptedList(_EncryptedBase): def on_get(self, request, response): """/lists/<list_id>""" if self._mlist is None: - return not_found(response) + not_found(response) else: okay(response, self._resource_as_json(self._mlist)) + def on_put(self, request, response): + """/lists/<list_id>""" + if self._mlist is None: + not_found(response) + else: + validator = Validator(**CONFIGURATION) + try: + with transaction(): + validator.update(self._mlist, request) + except ValueError as error: + bad_request(response, str(error)) + else: + no_content(response) + + def on_patch(self, request, response): + """/lists/<list_id>""" + if self._mlist is None: + not_found(response) + else: + try: + validator = PatchValidator(request, CONFIGURATION) + except UnknownPATCHRequestError as error: + bad_request( + response, + 'Unknown attribute: {}'.format(error.attribute)) + return + except ReadOnlyPATCHRequestError as error: + bad_request( + response, + 'Read-only attribute: {}'.format(error.attribute)) + return + try: + with transaction(): + validator.update(self._mlist, request) + except ValueError as error: + bad_request(response, str(error)) + else: + no_content(response) + + @child() + def key(self, context, segments): + if self._mlist is None: + return NotFound(), [] + return AListKey(self._mlist), [] + @child() def pubkey(self, context, segments): + if self._mlist is None: + return NotFound(), [] return AListPubkey(self._mlist), [] @public -class AListPubkey: +class AListKey: + """A PGP private key.""" + def __init__(self, mlist): self._mlist = mlist def on_get(self, request, response): """/lists/<list_id>/key""" - if self._mlist is None: - return not_found(response) + key = self._mlist.key + if key is None: + not_found(response) else: - pubkey = self._mlist.pubkey - if pubkey is None: - return not_found(response) + resource = dict(key=str(key), + key_fingerprint=str(key.fingerprint)) + okay(response, etag(resource)) + + def on_put(self, request, response): + """/lists/<list_id>/key""" + try: + validator = Validator(key=GetterSetter(key_from_blob)) + values = validator(request) + except ValueError as error: + bad_request(response, str(error)) + return + with transaction(): + self._mlist.key = values.pop('key') + accepted(response) + + +@public +class AListPubkey: + """A PGP list public key.""" + def __init__(self, mlist): + self._mlist = mlist + + def on_get(self, request, response): + """/lists/<list_id>/pubkey""" + pubkey = self._mlist.pubkey + if pubkey is None: + not_found(response) + else: resource = dict(public_key=str(pubkey), key_fingerprint=str(pubkey.fingerprint)) - return okay(response, etag(resource)) + okay(response, etag(resource)) diff --git a/src/mailman_pgp/rest/root.py b/src/mailman_pgp/rest/root.py index 374a88d..52ebfbe 100644 --- a/src/mailman_pgp/rest/root.py +++ b/src/mailman_pgp/rest/root.py @@ -28,7 +28,7 @@ from mailman.rest.helpers import child from public import public from mailman_pgp.rest.addresses import AllAddresses, AnAddress -from mailman_pgp.rest.lists import AllEncryptedLists, AnEncryptedList +from mailman_pgp.rest.lists import AllPGPLists, APGPList @public @@ -36,10 +36,10 @@ class RESTRoot: @child() def lists(self, context, segments): if len(segments) == 0: - return AllEncryptedLists(), [] + return AllPGPLists(), [] else: list_identifier = segments.pop(0) - return AnEncryptedList(list_identifier), segments + return APGPList(list_identifier), segments @child() def addresses(self, context, segments): diff --git a/src/mailman_pgp/rest/tests/test_lists.py b/src/mailman_pgp/rest/tests/test_lists.py index 2ebac6b..6509c04 100644 --- a/src/mailman_pgp/rest/tests/test_lists.py +++ b/src/mailman_pgp/rest/tests/test_lists.py @@ -14,10 +14,13 @@ # # You should have received a copy of the GNU General Public License along with # this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" from unittest import TestCase from urllib.error import HTTPError from mailman.app.lifecycle import create_list +from mailman.interfaces.action import Action from mailman.testing.helpers import call_api from pgpy import PGPKey @@ -35,11 +38,24 @@ class TestLists(TestCase): self.mlist = create_list('test@example.com', style_name='pgp-default') + self.list_key = load_key('ecc_p256.priv.asc') + with transaction(): + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = self.list_key + def test_missing_list(self): with self.assertRaises(HTTPError) as cm: call_api('http://localhost:9001/3.1/plugins/pgp/lists/' 'missing.example.com') self.assertEqual(cm.exception.code, 404) + with self.assertRaises(HTTPError) as cm: + call_api('http://localhost:9001/3.1/plugins/pgp/lists/' + 'missing.example.com/key') + self.assertEqual(cm.exception.code, 404) + with self.assertRaises(HTTPError) as cm: + call_api('http://localhost:9001/3.1/plugins/pgp/lists/' + 'missing.example.com/pubkey') + self.assertEqual(cm.exception.code, 404) def test_all_lists(self): json, response = call_api( @@ -60,17 +76,90 @@ class TestLists(TestCase): 'test@example.com') self.assertEqual(json['list_id'], self.mlist.list_id) + def test_put_list_config(self): + config = dict(unsigned_msg_action='defer', + inline_pgp_action='defer', + expired_sig_action='defer', + revoked_sig_action='defer', + invalid_sig_action='defer', + duplicate_sig_action='defer', + strip_original_sig=False, + sign_outgoing=True, + nonencrypted_msg_action='defer', + encrypt_outgoing=False) + json, response = call_api( + 'http://localhost:9001/3.1/plugins/pgp/lists/' + 'test.example.com', + data=config, + method='PUT') + + self.assertEqual(response.status_code, 204) + for key in config: + attr = getattr(self.pgp_list, key) + if isinstance(attr, Action): + attr = attr.name + self.assertEqual(attr, config[key]) + + def test_patch_list_config(self): + json, response = call_api( + 'http://localhost:9001/3.1/plugins/pgp/lists/' + 'test.example.com', + data=dict(unsigned_msg_action='defer'), + method='PATCH') + self.assertEqual(response.status_code, 204) + self.assertEqual(self.pgp_list.unsigned_msg_action, Action.defer) + def test_get_list_key(self): - with mm_transaction(): - mlist = create_list('another@example.com', - style_name='pgp-default') + json, response = call_api( + 'http://localhost:9001/3.1/plugins/pgp/lists/' + 'test.example.com/key') + json.pop('http_etag') + self.assertEqual(len(json.keys()), 2) + self.assertIn('key', json.keys()) + self.assertIn('key_fingerprint', json.keys()) + + key, _ = PGPKey.from_blob(json['key']) + self.assertFalse(key.is_public) + self.assertEqual(json['key_fingerprint'], key.fingerprint) + self.assertEqual(self.list_key.fingerprint, key.fingerprint) + + def test_missing_list_key(self): with transaction(): - pgp_list = PGPMailingList.for_list(mlist) - pgp_list.key = load_key('ecc_p256.priv.asc') + self.pgp_list.key = None + with self.assertRaises(HTTPError) as cm: + call_api('http://localhost:9001/3.1/plugins/pgp/lists/' + 'test.example.com/key') + self.assertEqual(cm.exception.code, 404) + + def test_set_list_key(self): + new_key = load_key('rsa_1024.priv.asc') + json, response = call_api( + 'http://localhost:9001/3.1/plugins/pgp/lists/' + 'test.example.com/key', + data=dict(key=str(new_key)), + method='PUT') + + self.assertEqual(response.status_code, 202) + + json, response = call_api( + 'http://localhost:9001/3.1/plugins/pgp/lists/' + 'test.example.com/key') + + key, _ = PGPKey.from_blob(json['key']) + self.assertEqual(key.fingerprint, new_key.fingerprint) + + def test_set_list_key_wrong(self): + with self.assertRaises(HTTPError) as cm: + call_api('http://localhost:9001/3.1/plugins/pgp/lists/' + 'test.example.com/key', + dict(key='some stuff?'), + method='PUT') + self.assertEqual(cm.exception.code, 400) + def test_get_list_pubkey(self): json, response = call_api( 'http://localhost:9001/3.1/plugins/pgp/lists/' - 'another.example.com/pubkey') + 'test.example.com/pubkey') json.pop('http_etag') self.assertEqual(len(json.keys()), 2) @@ -78,4 +167,14 @@ class TestLists(TestCase): self.assertIn('key_fingerprint', json.keys()) key, _ = PGPKey.from_blob(json['public_key']) + self.assertTrue(key.is_public) self.assertEqual(json['key_fingerprint'], key.fingerprint) + self.assertEqual(self.list_key.fingerprint, key.fingerprint) + + def test_missing_list_pubkey(self): + with transaction(): + self.pgp_list.key = None + with self.assertRaises(HTTPError) as cm: + call_api('http://localhost:9001/3.1/plugins/pgp/lists/' + 'test.example.com/pubkey') + self.assertEqual(cm.exception.code, 404) diff --git a/src/mailman_pgp/utils/pgp.py b/src/mailman_pgp/utils/pgp.py index 7b2f958..0e1e75c 100644 --- a/src/mailman_pgp/utils/pgp.py +++ b/src/mailman_pgp/utils/pgp.py @@ -16,6 +16,7 @@ # this program. If not, see <http://www.gnu.org/licenses/>. """Miscellaneous PGP utilities.""" +from pgpy import PGPKey from public import public @@ -47,3 +48,31 @@ def hashes(verifications): hasher = sigsubj.signature.hash_algorithm.hasher hasher.update(data) yield hasher.digest() + + +@public +def key_from_blob(blob): + """ + + :param blob: + :return: + """ + keys = PGPKey.from_blob(blob) + if isinstance(keys, tuple): + return keys[0] + else: + return keys + + +@public +def key_from_file(file): + """ + + :param file: + :return: + """ + keys = PGPKey.from_file(file) + if isinstance(keys, tuple): + return keys[0] + else: + return keys |
