diff options
| author | J08nY | 2017-07-14 02:01:33 +0200 |
|---|---|---|
| committer | J08nY | 2017-07-14 02:01:33 +0200 |
| commit | 615a1e7c01a0710c5ba138d81358d80827bcf680 (patch) | |
| tree | 39b289d771bdc7dbd859378d1f380d9d4e46a810 | |
| parent | d8afe4bec9282254483ea1c7571298dcd9731508 (diff) | |
| download | mailman-pgp-615a1e7c01a0710c5ba138d81358d80827bcf680.tar.gz mailman-pgp-615a1e7c01a0710c5ba138d81358d80827bcf680.tar.zst mailman-pgp-615a1e7c01a0710c5ba138d81358d80827bcf680.zip | |
| -rw-r--r-- | src/mailman_pgp/workflows/key_change.py | 20 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/tests/test_key_change.py | 105 |
2 files changed, 118 insertions, 7 deletions
diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py index a67edbb..9e204cf 100644 --- a/src/mailman_pgp/workflows/key_change.py +++ b/src/mailman_pgp/workflows/key_change.py @@ -18,6 +18,7 @@ """""" from mailman.email.message import UserNotification from mailman.interfaces.pending import IPendable, IPendings +from mailman.interfaces.subscriptions import TokenOwner from mailman.interfaces.workflows import IWorkflow from mailman.workflows.base import Workflow from pgpy import PGPKey @@ -30,6 +31,7 @@ from mailman_pgp.model.address import PGPAddress from mailman_pgp.pgp.utils import copy_headers from mailman_pgp.pgp.wrapper import PGPWrapper + CHANGE_CONFIRM_REQUEST = """\ ---------- TODO: this is a pgp enabled list. @@ -48,7 +50,7 @@ Token: {} class KeyChangeWorkflow(Workflow): name = 'pgp-key-change-workflow' description = '' - initial_state = 'send_key_confirm_request' + initial_state = 'change_key' save_attributes = ( 'address_key', 'pubkey_key' @@ -67,19 +69,21 @@ class KeyChangeWorkflow(Workflow): @address_key.setter def address_key(self, value): self.pgp_address = PGPAddress.for_email(value) + self.member = self.mlist.regular_members.get_member(value) @property def pubkey_key(self): - if self.pubkey is None: - return None return str(self.pubkey) @pubkey_key.setter def pubkey_key(self, value): - if value is not None: - self.pubkey, _ = PGPKey.from_blob(value) - else: - self.pubkey = None + self.pubkey, _ = PGPKey.from_blob(value) + + def _step_change_key(self): + if self.pgp_address is None or self.pubkey is None: + raise ValueError + + self.push('send_key_confirm_request') def _step_send_key_confirm_request(self): pendings = getUtility(IPendings) @@ -89,6 +93,7 @@ class KeyChangeWorkflow(Workflow): fingerprint=self.pubkey.fingerprint ) self.token = pendings.add(pendable) + self.token_owner = TokenOwner.subscriber self.push('receive_confirmation') self.save() @@ -116,6 +121,7 @@ class KeyChangeWorkflow(Workflow): if self.token is not None: pendings.confirm(self.token) self.token = None + self.token_owner = TokenOwner.no_one @classmethod def pendable_class(cls): diff --git a/src/mailman_pgp/workflows/tests/test_key_change.py b/src/mailman_pgp/workflows/tests/test_key_change.py new file mode 100644 index 0000000..5d61efd --- /dev/null +++ b/src/mailman_pgp/workflows/tests/test_key_change.py @@ -0,0 +1,105 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" + +import unittest + +from mailman.app.lifecycle import create_list +from mailman.interfaces.subscriptions import ISubscriptionManager, TokenOwner +from mailman.interfaces.usermanager import IUserManager +from mailman.testing.helpers import get_queue_messages +from public import public +from zope.component import getUtility + +from mailman_pgp.database import mm_transaction, transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.tests.base import load_key +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.workflows.key_change import KeyChangeWorkflow + + +@public +class TestKeyChangeWorkflow(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = load_key('ecc_p256.priv.asc') + + self.sender_key = load_key('rsa_1024.priv.asc') + self.sender_new_key = load_key('ecc_p256.priv.asc') + self.sender = getUtility(IUserManager).create_address( + 'rsa-1024b@example.org') + + def test_pgp_address_none(self): + workflow = KeyChangeWorkflow(self.mlist) + with self.assertRaises(ValueError): + list(workflow) + + def test_pubkey_none(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + t.add(pgp_address) + + workflow = KeyChangeWorkflow(self.mlist, pgp_address) + with self.assertRaises(ValueError): + list(workflow) + + def test_send_key_confirm_request(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow = KeyChangeWorkflow(self.mlist, pgp_address, + self.sender_new_key.pubkey) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + token = workflow.token + + self.assertEqual(message['Subject'], 'key confirm {}'.format(token)) + wrapped = PGPWrapper(message) + self.assertTrue(wrapped.is_encrypted()) + + def test_confirm(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow = KeyChangeWorkflow(self.mlist, pgp_address, + self.sender_new_key.pubkey) + list(workflow) + + token, token_owner, member = ISubscriptionManager(self.mlist).confirm( + workflow.token) + self.assertIsNone(token) + self.assertEqual(token_owner, TokenOwner.no_one) + + pgp_address = PGPAddress.for_address(self.sender) + self.assertEqual(pgp_address.key_fingerprint, + self.sender_new_key.fingerprint) + self.assertTrue(pgp_address.key_confirmed) |
