aboutsummaryrefslogtreecommitdiff
path: root/src/mailman_pgp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman_pgp')
-rw-r--r--src/mailman_pgp/model/list.py8
-rw-r--r--src/mailman_pgp/rest/lists.py131
-rw-r--r--src/mailman_pgp/rest/root.py6
-rw-r--r--src/mailman_pgp/rest/tests/test_lists.py111
-rw-r--r--src/mailman_pgp/utils/pgp.py29
5 files changed, 260 insertions, 25 deletions
diff --git a/src/mailman_pgp/model/list.py b/src/mailman_pgp/model/list.py
index 0163026..a0101a7 100644
--- a/src/mailman_pgp/model/list.py
+++ b/src/mailman_pgp/model/list.py
@@ -17,6 +17,7 @@
"""Model for PGP enabled mailing lists."""
+from os import remove
from os.path import exists, isfile, join
from flufl.lock import Lock
@@ -99,8 +100,11 @@ class PGPMailingList(Base):
def key(self, value):
with Lock(self.key_path + '.lock'):
self._key = value
- with open(self.key_path, 'w') as key_file:
- key_file.write(str(value))
+ if value is None:
+ remove(self.key_path)
+ else:
+ with open(self.key_path, 'w') as key_file:
+ key_file.write(str(value))
def generate_key(self, block=False):
self._key = None
diff --git a/src/mailman_pgp/rest/lists.py b/src/mailman_pgp/rest/lists.py
index 37b8d28..1998f91 100644
--- a/src/mailman_pgp/rest/lists.py
+++ b/src/mailman_pgp/rest/lists.py
@@ -15,18 +15,39 @@
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
-""""""
+"""REST interface to a PGP enabled mailing list."""
+from lazr.config import as_boolean
+from mailman.interfaces.action import Action
from mailman.interfaces.listmanager import IListManager
-from mailman.rest.helpers import (
- child, CollectionMixin, etag, not_found, okay)
+from mailman.rest.helpers import (accepted, bad_request,
+ child, CollectionMixin, etag, GetterSetter,
+ no_content, not_found, NotFound, okay)
+from mailman.rest.validator import (enum_validator, PatchValidator,
+ ReadOnlyPATCHRequestError,
+ UnknownPATCHRequestError, Validator)
from public import public
from zope.component import getUtility
from mailman_pgp.config import config
+from mailman_pgp.database import transaction
from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.utils.pgp import key_from_blob
+CONFIGURATION = dict(
+ unsigned_msg_action=GetterSetter(enum_validator(Action)),
+ inline_pgp_action=GetterSetter(enum_validator(Action)),
+ expired_sig_action=GetterSetter(enum_validator(Action)),
+ revoked_sig_action=GetterSetter(enum_validator(Action)),
+ invalid_sig_action=GetterSetter(enum_validator(Action)),
+ duplicate_sig_action=GetterSetter(enum_validator(Action)),
+ strip_original_sig=GetterSetter(as_boolean),
+ sign_outgoing=GetterSetter(as_boolean),
+ nonencrypted_msg_action=GetterSetter(enum_validator(Action)),
+ encrypt_outgoing=GetterSetter(as_boolean)
+)
-class _EncryptedBase(CollectionMixin):
+
+class _PGPListBase(CollectionMixin):
def _resource_as_dict(self, emlist):
"""See `CollectionMixin`."""
return dict(list_id=emlist.list_id,
@@ -35,9 +56,11 @@ class _EncryptedBase(CollectionMixin):
expired_sig_action=emlist.expired_sig_action,
revoked_sig_action=emlist.revoked_sig_action,
invalid_sig_action=emlist.invalid_sig_action,
+ duplicate_sig_action=emlist.duplicate_sig_action,
strip_original_sig=emlist.strip_original_sig,
sign_outgoing=emlist.sign_outgoing,
nonencrypted_msg_action=emlist.nonencrypted_msg_action,
+ encrypt_outgoing=emlist.encrypt_outgoing,
self_link=self.api.path_to(
'/plugins/{}/lists/{}'.format(config.name,
emlist.list_id)))
@@ -48,7 +71,9 @@ class _EncryptedBase(CollectionMixin):
@public
-class AllEncryptedLists(_EncryptedBase):
+class AllPGPLists(_PGPListBase):
+ """The PGP enabled mailing lists."""
+
def on_get(self, request, response):
"""/lists"""
resource = self._make_collection(request)
@@ -56,7 +81,9 @@ class AllEncryptedLists(_EncryptedBase):
@public
-class AnEncryptedList(_EncryptedBase):
+class APGPList(_PGPListBase):
+ """One PGP enabled mailing list."""
+
def __init__(self, list_identifier):
manager = getUtility(IListManager)
if '@' in list_identifier:
@@ -68,29 +95,105 @@ class AnEncryptedList(_EncryptedBase):
def on_get(self, request, response):
"""/lists/<list_id>"""
if self._mlist is None:
- return not_found(response)
+ not_found(response)
else:
okay(response, self._resource_as_json(self._mlist))
+ def on_put(self, request, response):
+ """/lists/<list_id>"""
+ if self._mlist is None:
+ not_found(response)
+ else:
+ validator = Validator(**CONFIGURATION)
+ try:
+ with transaction():
+ validator.update(self._mlist, request)
+ except ValueError as error:
+ bad_request(response, str(error))
+ else:
+ no_content(response)
+
+ def on_patch(self, request, response):
+ """/lists/<list_id>"""
+ if self._mlist is None:
+ not_found(response)
+ else:
+ try:
+ validator = PatchValidator(request, CONFIGURATION)
+ except UnknownPATCHRequestError as error:
+ bad_request(
+ response,
+ 'Unknown attribute: {}'.format(error.attribute))
+ return
+ except ReadOnlyPATCHRequestError as error:
+ bad_request(
+ response,
+ 'Read-only attribute: {}'.format(error.attribute))
+ return
+ try:
+ with transaction():
+ validator.update(self._mlist, request)
+ except ValueError as error:
+ bad_request(response, str(error))
+ else:
+ no_content(response)
+
+ @child()
+ def key(self, context, segments):
+ if self._mlist is None:
+ return NotFound(), []
+ return AListKey(self._mlist), []
+
@child()
def pubkey(self, context, segments):
+ if self._mlist is None:
+ return NotFound(), []
return AListPubkey(self._mlist), []
@public
-class AListPubkey:
+class AListKey:
+ """A PGP private key."""
+
def __init__(self, mlist):
self._mlist = mlist
def on_get(self, request, response):
"""/lists/<list_id>/key"""
- if self._mlist is None:
- return not_found(response)
+ key = self._mlist.key
+ if key is None:
+ not_found(response)
else:
- pubkey = self._mlist.pubkey
- if pubkey is None:
- return not_found(response)
+ resource = dict(key=str(key),
+ key_fingerprint=str(key.fingerprint))
+ okay(response, etag(resource))
+
+ def on_put(self, request, response):
+ """/lists/<list_id>/key"""
+ try:
+ validator = Validator(key=GetterSetter(key_from_blob))
+ values = validator(request)
+ except ValueError as error:
+ bad_request(response, str(error))
+ return
+ with transaction():
+ self._mlist.key = values.pop('key')
+ accepted(response)
+
+
+@public
+class AListPubkey:
+ """A PGP list public key."""
+ def __init__(self, mlist):
+ self._mlist = mlist
+
+ def on_get(self, request, response):
+ """/lists/<list_id>/pubkey"""
+ pubkey = self._mlist.pubkey
+ if pubkey is None:
+ not_found(response)
+ else:
resource = dict(public_key=str(pubkey),
key_fingerprint=str(pubkey.fingerprint))
- return okay(response, etag(resource))
+ okay(response, etag(resource))
diff --git a/src/mailman_pgp/rest/root.py b/src/mailman_pgp/rest/root.py
index 374a88d..52ebfbe 100644
--- a/src/mailman_pgp/rest/root.py
+++ b/src/mailman_pgp/rest/root.py
@@ -28,7 +28,7 @@ from mailman.rest.helpers import child
from public import public
from mailman_pgp.rest.addresses import AllAddresses, AnAddress
-from mailman_pgp.rest.lists import AllEncryptedLists, AnEncryptedList
+from mailman_pgp.rest.lists import AllPGPLists, APGPList
@public
@@ -36,10 +36,10 @@ class RESTRoot:
@child()
def lists(self, context, segments):
if len(segments) == 0:
- return AllEncryptedLists(), []
+ return AllPGPLists(), []
else:
list_identifier = segments.pop(0)
- return AnEncryptedList(list_identifier), segments
+ return APGPList(list_identifier), segments
@child()
def addresses(self, context, segments):
diff --git a/src/mailman_pgp/rest/tests/test_lists.py b/src/mailman_pgp/rest/tests/test_lists.py
index 2ebac6b..6509c04 100644
--- a/src/mailman_pgp/rest/tests/test_lists.py
+++ b/src/mailman_pgp/rest/tests/test_lists.py
@@ -14,10 +14,13 @@
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
from unittest import TestCase
from urllib.error import HTTPError
from mailman.app.lifecycle import create_list
+from mailman.interfaces.action import Action
from mailman.testing.helpers import call_api
from pgpy import PGPKey
@@ -35,11 +38,24 @@ class TestLists(TestCase):
self.mlist = create_list('test@example.com',
style_name='pgp-default')
+ self.list_key = load_key('ecc_p256.priv.asc')
+ with transaction():
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = self.list_key
+
def test_missing_list(self):
with self.assertRaises(HTTPError) as cm:
call_api('http://localhost:9001/3.1/plugins/pgp/lists/'
'missing.example.com')
self.assertEqual(cm.exception.code, 404)
+ with self.assertRaises(HTTPError) as cm:
+ call_api('http://localhost:9001/3.1/plugins/pgp/lists/'
+ 'missing.example.com/key')
+ self.assertEqual(cm.exception.code, 404)
+ with self.assertRaises(HTTPError) as cm:
+ call_api('http://localhost:9001/3.1/plugins/pgp/lists/'
+ 'missing.example.com/pubkey')
+ self.assertEqual(cm.exception.code, 404)
def test_all_lists(self):
json, response = call_api(
@@ -60,17 +76,90 @@ class TestLists(TestCase):
'test@example.com')
self.assertEqual(json['list_id'], self.mlist.list_id)
+ def test_put_list_config(self):
+ config = dict(unsigned_msg_action='defer',
+ inline_pgp_action='defer',
+ expired_sig_action='defer',
+ revoked_sig_action='defer',
+ invalid_sig_action='defer',
+ duplicate_sig_action='defer',
+ strip_original_sig=False,
+ sign_outgoing=True,
+ nonencrypted_msg_action='defer',
+ encrypt_outgoing=False)
+ json, response = call_api(
+ 'http://localhost:9001/3.1/plugins/pgp/lists/'
+ 'test.example.com',
+ data=config,
+ method='PUT')
+
+ self.assertEqual(response.status_code, 204)
+ for key in config:
+ attr = getattr(self.pgp_list, key)
+ if isinstance(attr, Action):
+ attr = attr.name
+ self.assertEqual(attr, config[key])
+
+ def test_patch_list_config(self):
+ json, response = call_api(
+ 'http://localhost:9001/3.1/plugins/pgp/lists/'
+ 'test.example.com',
+ data=dict(unsigned_msg_action='defer'),
+ method='PATCH')
+ self.assertEqual(response.status_code, 204)
+ self.assertEqual(self.pgp_list.unsigned_msg_action, Action.defer)
+
def test_get_list_key(self):
- with mm_transaction():
- mlist = create_list('another@example.com',
- style_name='pgp-default')
+ json, response = call_api(
+ 'http://localhost:9001/3.1/plugins/pgp/lists/'
+ 'test.example.com/key')
+ json.pop('http_etag')
+ self.assertEqual(len(json.keys()), 2)
+ self.assertIn('key', json.keys())
+ self.assertIn('key_fingerprint', json.keys())
+
+ key, _ = PGPKey.from_blob(json['key'])
+ self.assertFalse(key.is_public)
+ self.assertEqual(json['key_fingerprint'], key.fingerprint)
+ self.assertEqual(self.list_key.fingerprint, key.fingerprint)
+
+ def test_missing_list_key(self):
with transaction():
- pgp_list = PGPMailingList.for_list(mlist)
- pgp_list.key = load_key('ecc_p256.priv.asc')
+ self.pgp_list.key = None
+ with self.assertRaises(HTTPError) as cm:
+ call_api('http://localhost:9001/3.1/plugins/pgp/lists/'
+ 'test.example.com/key')
+ self.assertEqual(cm.exception.code, 404)
+
+ def test_set_list_key(self):
+ new_key = load_key('rsa_1024.priv.asc')
+ json, response = call_api(
+ 'http://localhost:9001/3.1/plugins/pgp/lists/'
+ 'test.example.com/key',
+ data=dict(key=str(new_key)),
+ method='PUT')
+
+ self.assertEqual(response.status_code, 202)
+
+ json, response = call_api(
+ 'http://localhost:9001/3.1/plugins/pgp/lists/'
+ 'test.example.com/key')
+
+ key, _ = PGPKey.from_blob(json['key'])
+ self.assertEqual(key.fingerprint, new_key.fingerprint)
+
+ def test_set_list_key_wrong(self):
+ with self.assertRaises(HTTPError) as cm:
+ call_api('http://localhost:9001/3.1/plugins/pgp/lists/'
+ 'test.example.com/key',
+ dict(key='some stuff?'),
+ method='PUT')
+ self.assertEqual(cm.exception.code, 400)
+ def test_get_list_pubkey(self):
json, response = call_api(
'http://localhost:9001/3.1/plugins/pgp/lists/'
- 'another.example.com/pubkey')
+ 'test.example.com/pubkey')
json.pop('http_etag')
self.assertEqual(len(json.keys()), 2)
@@ -78,4 +167,14 @@ class TestLists(TestCase):
self.assertIn('key_fingerprint', json.keys())
key, _ = PGPKey.from_blob(json['public_key'])
+ self.assertTrue(key.is_public)
self.assertEqual(json['key_fingerprint'], key.fingerprint)
+ self.assertEqual(self.list_key.fingerprint, key.fingerprint)
+
+ def test_missing_list_pubkey(self):
+ with transaction():
+ self.pgp_list.key = None
+ with self.assertRaises(HTTPError) as cm:
+ call_api('http://localhost:9001/3.1/plugins/pgp/lists/'
+ 'test.example.com/pubkey')
+ self.assertEqual(cm.exception.code, 404)
diff --git a/src/mailman_pgp/utils/pgp.py b/src/mailman_pgp/utils/pgp.py
index 7b2f958..0e1e75c 100644
--- a/src/mailman_pgp/utils/pgp.py
+++ b/src/mailman_pgp/utils/pgp.py
@@ -16,6 +16,7 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
"""Miscellaneous PGP utilities."""
+from pgpy import PGPKey
from public import public
@@ -47,3 +48,31 @@ def hashes(verifications):
hasher = sigsubj.signature.hash_algorithm.hasher
hasher.update(data)
yield hasher.digest()
+
+
+@public
+def key_from_blob(blob):
+ """
+
+ :param blob:
+ :return:
+ """
+ keys = PGPKey.from_blob(blob)
+ if isinstance(keys, tuple):
+ return keys[0]
+ else:
+ return keys
+
+
+@public
+def key_from_file(file):
+ """
+
+ :param file:
+ :return:
+ """
+ keys = PGPKey.from_file(file)
+ if isinstance(keys, tuple):
+ return keys[0]
+ else:
+ return keys