diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/mailman_pgp/commands/eml_key.py | 26 | ||||
| -rw-r--r-- | src/mailman_pgp/commands/tests/test_key.py | 43 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/base.py | 1 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/key_change.py | 117 |
4 files changed, 185 insertions, 2 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index a2fb4d5..9df6065 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -30,6 +30,7 @@ from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.workflows.base import CONFIRM_REQUEST +from mailman_pgp.workflows.key_change import KeyChangeWorkflow def _get_email(msg): @@ -130,7 +131,30 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results): # New public key in attachment, requires to be signed with current # key - pass + wrapped = PGPWrapper(msg) + if not wrapped.has_keys(): + print('No keys attached? Send a key.', file=results) + return ContinueProcessing.no + + keys = list(wrapped.keys()) + if len(keys) != 1: + print('More than one key! Send only one key.', file=results) + return ContinueProcessing.no + + email = _get_email(msg) + if not email: + print('No email to change key of.', file=results) + return ContinueProcessing.no + + pgp_address = PGPAddress.for_email(email) + if pgp_address is None: + print('A pgp enabled address not found.', file=results) + return ContinueProcessing.no + + workflow = KeyChangeWorkflow(mlist, pgp_address, keys.pop()) + list(workflow) + print('Key change request received.', file=results) + return ContinueProcessing.no def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results): diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index 48ca3c9..fe75a6a 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -365,3 +365,46 @@ class TestPreSubscription(unittest.TestCase): self.assertIn('Message not signed, ignoring.', results_msg.get_payload()) + + +class TestAfterSubscription(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + self.mlist = create_list('test@example.com', style_name='pgp-default') + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = load_key('ecc_p256.priv.asc') + + def test_key_change(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart_key = load_key('rsa_1024.priv.asc') + bart_new_key = load_key('ecc_p256.priv.asc') + + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key change') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_key(bart_new_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=2) + if items[0].msg['Subject'] == 'The results of your email commands': + results = items[0].msg + confirm_request = items[1].msg + else: + results = items[1].msg + confirm_request = items[0].msg + + self.assertIn('Key change request received.', results.get_payload()) + + confirm_wrapped = PGPWrapper(confirm_request) + self.assertTrue(confirm_wrapped.is_encrypted()) diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index 20db5f9..a8679bd 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -18,7 +18,6 @@ """""" from mailman.email.message import UserNotification from mailman.interfaces.subscriptions import TokenOwner -from mailman.workflows.common import WhichSubscriber from pgpy import PGPKey from mailman_pgp.model.address import PGPAddress diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py new file mode 100644 index 0000000..0098b19 --- /dev/null +++ b/src/mailman_pgp/workflows/key_change.py @@ -0,0 +1,117 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +from mailman.email.message import UserNotification +from mailman.interfaces.pending import IPendable, IPendings +from mailman.interfaces.workflows import IWorkflow +from mailman.workflows.base import Workflow +from pgpy import PGPKey +from public import public +from zope.component import getUtility +from zope.interface import implementer + +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.pgp.utils import copy_headers +from mailman_pgp.pgp.wrapper import PGPWrapper + +CHANGE_CONFIRM_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +You requested to change your key. +Reply to this message with this whole text +signed with your supplied key, either inline or PGP/MIME. + +Fingerprint: {} +Token: {} +---------- +""" + + +@public +@implementer(IWorkflow) +class KeyChangeWorkflow(Workflow): + name = 'pgp-key-change-workflow' + description = '' + initial_state = 'send_key_confirm_request' + save_attributes = ( + 'address_key', + 'pubkey_key' + ) + + def __init__(self, mlist, pgp_address=None, pubkey=None): + super().__init__() + self.mlist = mlist + self.pgp_address = pgp_address + self.pubkey = pubkey + + @property + def address_key(self): + return self.pgp_address.email + + @address_key.setter + def address_key(self, value): + self.pgp_address = PGPAddress.for_email(value) + + @property + def pubkey_key(self): + if self.pubkey is None: + return None + return str(self.pubkey) + + @pubkey_key.setter + def pubkey_key(self, value): + if value is not None: + self.pubkey, _ = PGPKey.from_blob(value) + else: + self.pubkey = None + + def _step_send_key_confirm_request(self): + pendings = getUtility(IPendings) + pendable = KeyChangeWorkflow.pendable_class()( + email=self.pgp_address.email, + pubkey=str(self.pubkey) + ) + self.token = pendings.add(pendable) + + self.push('receive_key_confirmation') + self.save() + request_address = self.mlist.request_address + email_address = self.pgp_address.email + msg = UserNotification(email_address, request_address, + 'key confirm {}'.format(self.token), + CHANGE_CONFIRM_REQUEST.format( + self.pubkey.fingerprint, + self.token)) + wrapped = PGPWrapper(msg) + encrypted = wrapped.encrypt(self.pubkey) + + msg.set_payload(encrypted.get_payload()) + copy_headers(encrypted, msg, True) + msg.send(self.mlist) + raise StopIteration + + def _step_receive_confirmation(self): + pass + + @classmethod + def pendable_class(cls): + @implementer(IPendable) + class Pendable(dict): + PEND_TYPE = KeyChangeWorkflow.name + + return Pendable |
