From 0ba497780cbe1abe9b91321993e31456bfcf1cd0 Mon Sep 17 00:00:00 2001 From: J08nY Date: Fri, 7 Jul 2017 23:09:39 +0200 Subject: WIP: Sketch out key management on subscription. --- src/mailman_pgp/commands/eml_key.py | 88 ++++++++++++- src/mailman_pgp/workflows/__init__.py | 0 src/mailman_pgp/workflows/base.py | 97 +++++++++++++++ src/mailman_pgp/workflows/subscription.py | 137 +++++++++++++++++++++ src/mailman_pgp/workflows/tests/__init__.py | 0 src/mailman_pgp/workflows/tests/test_base.py | 16 +++ .../workflows/tests/test_subscription.py | 16 +++ 7 files changed, 351 insertions(+), 3 deletions(-) create mode 100644 src/mailman_pgp/workflows/__init__.py create mode 100644 src/mailman_pgp/workflows/base.py create mode 100644 src/mailman_pgp/workflows/subscription.py create mode 100644 src/mailman_pgp/workflows/tests/__init__.py create mode 100644 src/mailman_pgp/workflows/tests/test_base.py create mode 100644 src/mailman_pgp/workflows/tests/test_subscription.py diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index 9514f48..b280603 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -16,11 +16,21 @@ # this program. If not, see . """The key email command.""" +from email.utils import parseaddr from mailman.interfaces.command import ContinueProcessing, IEmailCommand +from mailman.interfaces.subscriptions import ISubscriptionManager +from mailman.interfaces.usermanager import IUserManager from public import public +from zope.component import getUtility from zope.interface import implementer +from mailman_pgp.database import transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.workflows.base import CONFIRM_REQUEST + @public @implementer(IEmailCommand) @@ -28,7 +38,7 @@ class KeyCommand: """The `key` command.""" name = 'key' - argument_description = '' + argument_description = '' short_description = '' description = '' @@ -38,7 +48,78 @@ class KeyCommand: print('No sub-command specified,' ' must be one of .', file=results) return ContinueProcessing.no - if arguments[0] == 'change': + + pgp_list = PGPMailingList.for_list(mlist) + if pgp_list is None: + print('This mailing list doesnt have pgp enabled.', file=results) + return ContinueProcessing.yes # XXX: What does this even mean? + + if arguments[0] == 'set': + if len(arguments) != 2: + print('Missing token', file=results) + return ContinueProcessing.no + + wrapped = PGPWrapper(msg) + if not wrapped.has_keys(): + print('No keys here?') + return ContinueProcessing.no + + keys = list(wrapped.keys()) + if len(keys) != 1: + print('More than one key! Which one is yours?') + return ContinueProcessing.no + + with transaction() as t: + pgp_address = PGPAddress(self.address) + pgp_address.key = keys.pop() + t.add(pgp_address) + + token = arguments[1] + ISubscriptionManager(mlist).confirm(token) + # XXX finish this + elif arguments[0] == 'confirm': + if len(arguments) != 2: + print('Missing token', file=results) + return ContinueProcessing.no + + display_name, email = parseaddr(msg['from']) + # Address could be None or the empty string. + if not email: + email = msg.sender + if not email: + print('No email to subscribe with.', file=results) + return ContinueProcessing.no + um = getUtility(IUserManager) + + pgp_address = PGPAddress.for_address(um.get_address(email)) + if pgp_address is None: + # TODO + pass + + wrapped = PGPWrapper(msg) + if wrapped.is_encrypted(): + decrypted = wrapped.decrypt(pgp_list.key) + wrapped = PGPWrapper(decrypted) + + if not wrapped.is_signed(): + print('Message not signed, ignoring.', file=results) + return ContinueProcessing.no + + if not wrapped.verifies(pgp_address.key): + print('Message failed to verify.', file=results) + return ContinueProcessing.no + + token = arguments[1] + + expecting = CONFIRM_REQUEST.format(pgp_address.key.pubkey, + token) + if wrapped.get_signed() == expecting: + ISubscriptionManager(mlist).confirm(token) + else: + # TODO + pass + + elif arguments[0] == 'change': # New public key in attachment, requires to be signed with current # key pass @@ -52,4 +133,5 @@ class KeyCommand: else: print('Wrong sub-command specified,' ' must be one of .', file=results) - return ContinueProcessing.no + + return ContinueProcessing.no diff --git a/src/mailman_pgp/workflows/__init__.py b/src/mailman_pgp/workflows/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py new file mode 100644 index 0000000..40ff4d7 --- /dev/null +++ b/src/mailman_pgp/workflows/base.py @@ -0,0 +1,97 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . + +"""""" +from mailman.email.message import UserNotification +from mailman.interfaces.subscriptions import TokenOwner + +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.wrapper import PGPWrapper + +KEY_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +We need your pubkey. +Reply to this message with it as a PGP/MIME(preferred) or inline. +----------""" + +CONFIRM_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +Reply to this message with this whole text +signed with your supplied key, either inline or PGP/MIME. + +Fingerprint: {} +Token: {} +---------- +""" + + +class PubkeyMixin: + def __init__(self, pubkey=None): + self.pubkey = pubkey + + def _step_pubkey_checks(self): + if not self.pubkey: + self.push('send_key_request') + + def _step_send_key_request(self): + self._set_token(TokenOwner.subscriber) + self.push('receive_key') + self.save() + request_address = self.mlist.request_address + email_address = self.address.email + msg = UserNotification(email_address, request_address, + 'key set {}'.format(self.token), + KEY_REQUEST) + msg.send(self.mlist, add_precedence=False) + # Now we wait for the confirmation. + raise StopIteration + + def _step_receive_key(self): + pgp_address = PGPAddress.for_address(self.address) + if pgp_address is None or pgp_address.key: + # The workflow was confirmed but we still dont have an address + # or the pubkey. So resend request and wait. + self.push('send_key_request') + return + else: + self.pubkey = pgp_address.key + self.push('send_confirm_request') + + def _step_send_confirm_request(self): + self._set_token(TokenOwner.subscriber) + self.push('receive_confirmation') + self.save() + request_address = self.mlist.request_address + email_address = self.address.email + msg = UserNotification(email_address, request_address, + 'key confirm {}'.format(self.token), + CONFIRM_REQUEST.format(self.pubkey.fingerprint, + self.token)) + pgp_list = PGPMailingList.for_list(self.mlist) + wrapped = PGPWrapper(msg) + encrypted = wrapped.encrypt(self.pubkey, pgp_list.pubkey) + + # XXX: This is not good: + msg.set_payload(encrypted) + msg.send(self.mlist) + raise StopIteration + + def _step_receive_confirmation(self): + pass diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py new file mode 100644 index 0000000..a571b44 --- /dev/null +++ b/src/mailman_pgp/workflows/subscription.py @@ -0,0 +1,137 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . + +"""""" + +from mailman.core.i18n import _ +from mailman.interfaces.workflows import ISubscriptionWorkflow +from mailman.workflows.common import (ConfirmationMixin, ModerationMixin, + SubscriptionBase, VerificationMixin) +from public import public +from zope.interface import implementer + +from mailman_pgp.workflows.base import PubkeyMixin + + +@public +@implementer(ISubscriptionWorkflow) +class ConfimSubscriptionPolicy(SubscriptionBase, VerificationMixin, + ConfirmationMixin, PubkeyMixin): + """""" + + name = 'pgp-policy-confirm' + description = _('An subscription policy, for a PGP-enabled mailing list ' + 'that requires confirmation.') + initial_state = 'prepare' + save_attributes = ( + 'verified', + 'confirmed', + 'pubkey', + 'address_key', + 'subscriber_key', + 'user_key', + 'token_owner_key', + ) + + def __init__(self, mlist, subscriber=None, *, + pre_verified=False, pre_confirmed=False, pubkey=None): + SubscriptionBase.__init__(self, mlist, subscriber) + VerificationMixin.__init__(self, pre_verified=pre_verified) + ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed) + PubkeyMixin.__init__(self, pubkey=pubkey) + + def _step_prepare(self): + self.push('do_subscription') + self.push('pubkey_checks') + self.push('confirmation_checks') + self.push('verification_checks') + self.push('sanity_checks') + + +@public +@implementer(ISubscriptionWorkflow) +class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, + ModerationMixin, PubkeyMixin): + """""" + + name = 'pgp-policy-moderate' + description = _('An subscription policy, for a PGP-enabled mailing list ' + 'that requires moderation.') + initial_state = 'prepare' + save_attributes = ( + 'verified', + 'approved', + 'pubkey', + 'address_key', + 'subscriber_key', + 'user_key', + 'token_owner_key', + ) + + def __init__(self, mlist, subscriber=None, *, + pre_verified=False, pre_approved=False, pubkey=None): + SubscriptionBase.__init__(self, mlist, subscriber) + VerificationMixin.__init__(self, pre_verified=pre_verified) + ModerationMixin.__init__(self, pre_approved=pre_approved) + PubkeyMixin.__init__(self, pubkey=pubkey) + + def _step_prepare(self): + self.push('do_subscription') + self.push('moderation_checks') + self.push('pubkey_checks') + self.push('verification_checks') + self.push('sanity_checks') + + +@public +@implementer(ISubscriptionWorkflow) +class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, + ConfirmationMixin, ModerationMixin, + PubkeyMixin): + """""" + + name = 'pgp-policy-confirm-moderate' + description = _('An subscription policy, for a PGP-enabled mailing list ' + 'that requires moderation after confirmation.') + initial_state = 'prepare' + save_attributes = ( + 'verified', + 'confirmed', + 'approved', + 'pubkey', + 'address_key', + 'subscriber_key', + 'user_key', + 'token_owner_key', + ) + + def __init__(self, mlist, subscriber=None, *, + pre_verified=False, pre_confirmed=False, pre_approved=False, + pubkey=None): + SubscriptionBase.__init__(self, mlist, subscriber) + VerificationMixin.__init__(self, pre_verified=pre_verified) + ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed) + ModerationMixin.__init__(self, pre_approved=pre_approved) + PubkeyMixin.__init__(self, pubkey=pubkey) + + def _step_prepare(self): + self.push('do_subscription') + self.push('moderation_checks') + self.push('pubkey_checks') + self.push('confirmation_checks') + self.push('verification_checks') + self.push('sanity_checks') diff --git a/src/mailman_pgp/workflows/tests/__init__.py b/src/mailman_pgp/workflows/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py new file mode 100644 index 0000000..8b6b4d1 --- /dev/null +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -0,0 +1,16 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . diff --git a/src/mailman_pgp/workflows/tests/test_subscription.py b/src/mailman_pgp/workflows/tests/test_subscription.py new file mode 100644 index 0000000..8b6b4d1 --- /dev/null +++ b/src/mailman_pgp/workflows/tests/test_subscription.py @@ -0,0 +1,16 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . -- cgit v1.2.3-70-g09d2 From 589cfab420d1fa9343956be1a31e28e3524f5f69 Mon Sep 17 00:00:00 2001 From: J08nY Date: Mon, 10 Jul 2017 23:32:24 +0200 Subject: Add an overwrite boolean argument to copy_headers util function. --- src/mailman_pgp/pgp/utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/mailman_pgp/pgp/utils.py b/src/mailman_pgp/pgp/utils.py index 0946e3f..a824138 100644 --- a/src/mailman_pgp/pgp/utils.py +++ b/src/mailman_pgp/pgp/utils.py @@ -18,7 +18,7 @@ """Various pgp and email utilities.""" -def copy_headers(from_msg, to_msg): +def copy_headers(from_msg, to_msg, overwrite=False): """ Copy the headers and unixfrom from a message to another one. @@ -28,6 +28,8 @@ def copy_headers(from_msg, to_msg): :type to_msg: email.message.Message """ for key, value in from_msg.items(): + if overwrite: + del to_msg[key] if key not in to_msg: to_msg[key] = value if to_msg.get_unixfrom() is None: -- cgit v1.2.3-70-g09d2 From 7b136683c76afae000c07f5dc54f3785b038695f Mon Sep 17 00:00:00 2001 From: J08nY Date: Mon, 10 Jul 2017 23:32:50 +0200 Subject: Make MIMEWrapper create messages with proper header casing. --- src/mailman_pgp/pgp/mime.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py index afb1c35..1ecbe76 100644 --- a/src/mailman_pgp/pgp/mime.py +++ b/src/mailman_pgp/pgp/mime.py @@ -181,9 +181,9 @@ class MIMEWrapper: _subtype=MIMEWrapper._signature_subtype, _encoder=encode_7or8bit, name='signature.asc') - second_part.add_header('content-description', + second_part.add_header('Content-Description', 'OpenPGP digital signature') - second_part.add_header('content-disposition', 'attachment', + second_part.add_header('Content-Disposition', 'attachment', filename='signature.asc') out.attach(copy.deepcopy(msg)) @@ -249,16 +249,16 @@ class MIMEWrapper: first_part = MIMEApplication(_data='Version: 1', _subtype=MIMEWrapper._encryption_subtype, _encoder=encode_7or8bit) - first_part.add_header('content-description', + first_part.add_header('Content-Description', 'PGP/MIME version identification') second_part = MIMEApplication(_data=str(payload), _subtype='octet-stream', _encoder=encode_7or8bit, name='encrypted.asc') - second_part.add_header('content-description', + second_part.add_header('Content-Description', 'OpenPGP encrypted message') - second_part.add_header('content-disposition', 'inline', + second_part.add_header('Content-Disposition', 'inline', filename='encrypted.asc') out.attach(first_part) out.attach(second_part) -- cgit v1.2.3-70-g09d2 From 4d98c0bcc1ee4eb7d2de38a7ae21f97f1b0c9943 Mon Sep 17 00:00:00 2001 From: J08nY Date: Mon, 10 Jul 2017 23:43:44 +0200 Subject: Add pubkey_confirmed attribute to subscription workflows. --- src/mailman_pgp/workflows/base.py | 32 ++++++++++++++++++++++++++----- src/mailman_pgp/workflows/subscription.py | 30 ++++++++++++++++++----------- 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index 40ff4d7..3f4ba8d 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -18,9 +18,11 @@ """""" from mailman.email.message import UserNotification from mailman.interfaces.subscriptions import TokenOwner +from pgpy import PGPKey from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.utils import copy_headers from mailman_pgp.pgp.wrapper import PGPWrapper KEY_REQUEST = """\ @@ -43,12 +45,29 @@ Token: {} class PubkeyMixin: - def __init__(self, pubkey=None): + def __init__(self, pubkey=None, pre_confirmed=False): self.pubkey = pubkey + self.pubkey_confirmed = pre_confirmed + + @property + def pubkey_key(self): + if self.pubkey is None: + return None + return str(self.pubkey) + + @pubkey_key.setter + def pubkey_key(self, value): + if value is not None: + self.pubkey, _ = PGPKey.from_blob(value) + else: + self.pubkey = None def _step_pubkey_checks(self): if not self.pubkey: self.push('send_key_request') + else: + if not self.pubkey_confirmed: + self.push('send_confirm_request') def _step_send_key_request(self): self._set_token(TokenOwner.subscriber) @@ -65,14 +84,15 @@ class PubkeyMixin: def _step_receive_key(self): pgp_address = PGPAddress.for_address(self.address) - if pgp_address is None or pgp_address.key: + if pgp_address is None or pgp_address.key is None: # The workflow was confirmed but we still dont have an address # or the pubkey. So resend request and wait. self.push('send_key_request') return else: self.pubkey = pgp_address.key - self.push('send_confirm_request') + if not self.pubkey_confirmed: + self.push('send_confirm_request') def _step_send_confirm_request(self): self._set_token(TokenOwner.subscriber) @@ -86,10 +106,12 @@ class PubkeyMixin: self.token)) pgp_list = PGPMailingList.for_list(self.mlist) wrapped = PGPWrapper(msg) - encrypted = wrapped.encrypt(self.pubkey, pgp_list.pubkey) + encrypted = wrapped.sign_encrypt(pgp_list.key, self.pubkey, + pgp_list.pubkey) # XXX: This is not good: - msg.set_payload(encrypted) + msg.set_payload(encrypted.get_payload()) + copy_headers(encrypted, msg, True) msg.send(self.mlist) raise StopIteration diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py index a571b44..6b8240c 100644 --- a/src/mailman_pgp/workflows/subscription.py +++ b/src/mailman_pgp/workflows/subscription.py @@ -29,8 +29,8 @@ from mailman_pgp.workflows.base import PubkeyMixin @public @implementer(ISubscriptionWorkflow) -class ConfimSubscriptionPolicy(SubscriptionBase, VerificationMixin, - ConfirmationMixin, PubkeyMixin): +class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, + ConfirmationMixin, PubkeyMixin): """""" name = 'pgp-policy-confirm' @@ -40,7 +40,8 @@ class ConfimSubscriptionPolicy(SubscriptionBase, VerificationMixin, save_attributes = ( 'verified', 'confirmed', - 'pubkey', + 'pubkey_key', + 'pubkey_confirmed', 'address_key', 'subscriber_key', 'user_key', @@ -48,11 +49,13 @@ class ConfimSubscriptionPolicy(SubscriptionBase, VerificationMixin, ) def __init__(self, mlist, subscriber=None, *, - pre_verified=False, pre_confirmed=False, pubkey=None): + pre_verified=False, pre_confirmed=False, pubkey=None, + pubkey_pre_confirmed=False): SubscriptionBase.__init__(self, mlist, subscriber) VerificationMixin.__init__(self, pre_verified=pre_verified) ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed) - PubkeyMixin.__init__(self, pubkey=pubkey) + PubkeyMixin.__init__(self, pubkey=pubkey, + pre_confirmed=pubkey_pre_confirmed) def _step_prepare(self): self.push('do_subscription') @@ -75,7 +78,8 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, save_attributes = ( 'verified', 'approved', - 'pubkey', + 'pubkey_key', + 'pubkey_confirmed', 'address_key', 'subscriber_key', 'user_key', @@ -83,11 +87,13 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, ) def __init__(self, mlist, subscriber=None, *, - pre_verified=False, pre_approved=False, pubkey=None): + pre_verified=False, pre_approved=False, pubkey=None, + pubkey_pre_confirmed=False): SubscriptionBase.__init__(self, mlist, subscriber) VerificationMixin.__init__(self, pre_verified=pre_verified) ModerationMixin.__init__(self, pre_approved=pre_approved) - PubkeyMixin.__init__(self, pubkey=pubkey) + PubkeyMixin.__init__(self, pubkey=pubkey, + pre_confirmed=pubkey_pre_confirmed) def _step_prepare(self): self.push('do_subscription') @@ -112,7 +118,8 @@ class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, 'verified', 'confirmed', 'approved', - 'pubkey', + 'pubkey_key', + 'pubkey_confirmed', 'address_key', 'subscriber_key', 'user_key', @@ -121,12 +128,13 @@ class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, def __init__(self, mlist, subscriber=None, *, pre_verified=False, pre_confirmed=False, pre_approved=False, - pubkey=None): + pubkey=None, pubkey_pre_confirmed=False): SubscriptionBase.__init__(self, mlist, subscriber) VerificationMixin.__init__(self, pre_verified=pre_verified) ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed) ModerationMixin.__init__(self, pre_approved=pre_approved) - PubkeyMixin.__init__(self, pubkey=pubkey) + PubkeyMixin.__init__(self, pubkey=pubkey, + pre_confirmed=pubkey_pre_confirmed) def _step_prepare(self): self.push('do_subscription') -- cgit v1.2.3-70-g09d2 From 3fdc4dfc1f1d2a2f8c9ca1781d54a11166894890 Mon Sep 17 00:00:00 2001 From: J08nY Date: Mon, 10 Jul 2017 23:45:25 +0200 Subject: Ignore warnings from PGPy. --- src/mailman_pgp/testing/start.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/mailman_pgp/testing/start.py b/src/mailman_pgp/testing/start.py index 37f3de4..128855e 100644 --- a/src/mailman_pgp/testing/start.py +++ b/src/mailman_pgp/testing/start.py @@ -25,3 +25,5 @@ from public import public def start_run(plugin): warnings.filterwarnings(action='ignore', category=UserWarning, module='.*mailman_pgp.*') + warnings.filterwarnings(action='ignore', category=UserWarning, + module='.*PGPy.*') -- cgit v1.2.3-70-g09d2 From ed5fc4825fd56c4ec9b3e4c07554977363c55bc7 Mon Sep 17 00:00:00 2001 From: J08nY Date: Tue, 11 Jul 2017 00:10:11 +0200 Subject: Check signature expiration in PGPWrapper.verifies. --- src/mailman_pgp/pgp/wrapper.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/mailman_pgp/pgp/wrapper.py b/src/mailman_pgp/pgp/wrapper.py index fcbec0e..6e8a8f9 100644 --- a/src/mailman_pgp/pgp/wrapper.py +++ b/src/mailman_pgp/pgp/wrapper.py @@ -80,7 +80,10 @@ class PGPWrapper(): yield from self.inline.verify(key) def verifies(self, key): - return all(bool(verification) for verification in self.verify(key)) + return all(bool(verification) and + all(not sigsubj.signature.is_expired + for sigsubj in verification.good_signatures) for + verification in self.verify(key)) def is_encrypted(self): return self.mime.is_encrypted() or self.inline.is_encrypted() -- cgit v1.2.3-70-g09d2 From f8793bcc2e00c4ee639e4f0e75cfc2b19ea1849f Mon Sep 17 00:00:00 2001 From: J08nY Date: Tue, 11 Jul 2017 00:36:57 +0200 Subject: Add tests for subscription workflows and their pubkey mixin. --- src/mailman_pgp/styles/tests/test_base.py | 3 +- src/mailman_pgp/workflows/base.py | 3 +- src/mailman_pgp/workflows/tests/test_base.py | 174 +++++++++++++++++++++ .../workflows/tests/test_subscription.py | 36 +++++ 4 files changed, 212 insertions(+), 4 deletions(-) diff --git a/src/mailman_pgp/styles/tests/test_base.py b/src/mailman_pgp/styles/tests/test_base.py index 159a242..d343fe4 100644 --- a/src/mailman_pgp/styles/tests/test_base.py +++ b/src/mailman_pgp/styles/tests/test_base.py @@ -33,8 +33,7 @@ class TestBaseStyle(TestCase): base_style = PGPStyle() base_style.apply(mlist) - pgp_list = PGPMailingList.query().filter_by( - list_id=mlist.list_id).first() + pgp_list = PGPMailingList.for_list(mlist) # Test that we have our PGPMailingList self.assertIsNotNone(pgp_list) diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index 3f4ba8d..52dd879 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -109,11 +109,10 @@ class PubkeyMixin: encrypted = wrapped.sign_encrypt(pgp_list.key, self.pubkey, pgp_list.pubkey) - # XXX: This is not good: msg.set_payload(encrypted.get_payload()) copy_headers(encrypted, msg, True) msg.send(self.mlist) raise StopIteration def _step_receive_confirmation(self): - pass + pass \ No newline at end of file diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py index 8b6b4d1..19b2e58 100644 --- a/src/mailman_pgp/workflows/tests/test_base.py +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -14,3 +14,177 @@ # # You should have received a copy of the GNU General Public License along with # this program. If not, see . + +"""""" + +import unittest +from contextlib import suppress +from unittest.mock import patch + +from mailman.app.lifecycle import create_list +from mailman.interfaces.pending import IPendings +from mailman.interfaces.usermanager import IUserManager +from mailman.testing.helpers import get_queue_messages +from zope.component import getUtility + +from mailman_pgp.config import config +from mailman_pgp.database import mm_transaction, transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.tests.base import load_key +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.workflows.base import KEY_REQUEST +from mailman_pgp.workflows.subscription import ConfirmSubscriptionPolicy + + +class TestPubkeyMixin(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + + self.list_key = load_key('ecc_p256.priv.asc') + + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = self.list_key + + self.um = getUtility(IUserManager) + + self.sender_key = load_key('rsa_1024.priv.asc') + self.sender = self.um.create_address('rsa-1024b@example.org') + + def test_pended_data(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True) + with suppress(StopIteration): + workflow.run_thru('send_key_request') + self.assertIsNotNone(workflow.token) + pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) + self.assertEqual(pendable['list_id'], 'test.example.com') + self.assertEqual(pendable['email'], 'rsa-1024b@example.org') + self.assertEqual(pendable['display_name'], '') + self.assertEqual(pendable['when'], '2005-08-01T07:49:23') + self.assertEqual(pendable['token_owner'], 'subscriber') + + def test_key_request_sent(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + token = workflow.token + + self.assertEqual(message['Subject'], 'key set {}'.format(token)) + self.assertEqual(message.get_payload(), KEY_REQUEST) + + def test_key_request_pubkey_set(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=True) + workflow.run_thru('pubkey_checks') + with patch.object(workflow, '_step_do_subscription') as step: + next(workflow) + step.assert_called_once_with() + + def test_receive_key(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True) + list(workflow) + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + t.add(pgp_address) + + receive_workflow = ConfirmSubscriptionPolicy(self.mlist) + receive_workflow.token = workflow.token + receive_workflow.restore() + receive_workflow.run_thru('receive_key') + + self.assertIsNotNone(receive_workflow.pubkey) + self.assertEqual(receive_workflow.pubkey.fingerprint, + self.sender_key.pubkey.fingerprint) + + with patch.object(receive_workflow, + '_step_send_confirm_request') as step: + next(receive_workflow) + step.assert_called_once_with() + + def test_receive_key_no_address(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True) + list(workflow) + receive_workflow = ConfirmSubscriptionPolicy(self.mlist) + receive_workflow.token = workflow.token + receive_workflow.restore() + receive_workflow.run_thru('receive_key') + + self.assertIsNone(receive_workflow.pubkey) + with patch.object(receive_workflow, + '_step_send_key_request') as step: + next(receive_workflow) + step.assert_called_once_with() + + def test_receive_key_pubkey_confirmed(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True, + pubkey_pre_confirmed=True) + list(workflow) + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + t.add(pgp_address) + + receive_workflow = ConfirmSubscriptionPolicy(self.mlist) + receive_workflow.token = workflow.token + receive_workflow.restore() + receive_workflow.run_thru('receive_key') + with patch.object(receive_workflow, '_step_do_subscription') as step: + next(receive_workflow) + step.assert_called_once_with() + + def test_send_key_confirm_request(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=False) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + token = workflow.token + + self.assertEqual(message['Subject'], 'key confirm {}'.format(token)) + wrapped = PGPWrapper(message) + self.assertTrue(wrapped.is_encrypted()) + + def test_receive_confirmation(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=False) + list(workflow) + + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + t.add(pgp_address) + + receive_workflow = ConfirmSubscriptionPolicy(self.mlist) + receive_workflow.token = workflow.token + receive_workflow.restore() + receive_workflow.run_thru('receive_confirmation') + + with patch.object(receive_workflow, '_step_do_subscription') as step: + next(receive_workflow) + step.assert_called_once_with() diff --git a/src/mailman_pgp/workflows/tests/test_subscription.py b/src/mailman_pgp/workflows/tests/test_subscription.py index 8b6b4d1..da59bee 100644 --- a/src/mailman_pgp/workflows/tests/test_subscription.py +++ b/src/mailman_pgp/workflows/tests/test_subscription.py @@ -14,3 +14,39 @@ # # You should have received a copy of the GNU General Public License along with # this program. If not, see . + +"""""" +import unittest + +from mailman.app.lifecycle import create_list +from mailman.interfaces.usermanager import IUserManager +from zope.component import getUtility + +from mailman_pgp.database import mm_transaction +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.workflows.subscription import (ConfirmSubscriptionPolicy, + ModerationSubscriptionPolicy, + ConfirmModerationSubscriptionPolicy) + + +class TestSubscriptionWorkflows(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + self.sender = getUtility(IUserManager).create_address( + 'rsa-1024b@example.org') + + def test_confirm_policy(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender) + next(workflow) + + def test_moderation_policy(self): + workflow = ModerationSubscriptionPolicy(self.mlist, self.sender) + next(workflow) + + def test_confirm_moderation_policy(self): + workflow = ConfirmModerationSubscriptionPolicy(self.mlist, self.sender) + next(workflow) -- cgit v1.2.3-70-g09d2 From 383ce5f4cc6cb81f91f90f3408742c4c06bca301 Mon Sep 17 00:00:00 2001 From: J08nY Date: Tue, 11 Jul 2017 01:16:23 +0200 Subject: qa. --- src/mailman_pgp/workflows/base.py | 2 +- src/mailman_pgp/workflows/tests/test_base.py | 1 - src/mailman_pgp/workflows/tests/test_subscription.py | 6 +++--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index 52dd879..9bf4db8 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -115,4 +115,4 @@ class PubkeyMixin: raise StopIteration def _step_receive_confirmation(self): - pass \ No newline at end of file + pass diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py index 19b2e58..a52571c 100644 --- a/src/mailman_pgp/workflows/tests/test_base.py +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -27,7 +27,6 @@ from mailman.interfaces.usermanager import IUserManager from mailman.testing.helpers import get_queue_messages from zope.component import getUtility -from mailman_pgp.config import config from mailman_pgp.database import mm_transaction, transaction from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList diff --git a/src/mailman_pgp/workflows/tests/test_subscription.py b/src/mailman_pgp/workflows/tests/test_subscription.py index da59bee..f8a67f3 100644 --- a/src/mailman_pgp/workflows/tests/test_subscription.py +++ b/src/mailman_pgp/workflows/tests/test_subscription.py @@ -24,9 +24,9 @@ from zope.component import getUtility from mailman_pgp.database import mm_transaction from mailman_pgp.testing.layers import PGPConfigLayer -from mailman_pgp.workflows.subscription import (ConfirmSubscriptionPolicy, - ModerationSubscriptionPolicy, - ConfirmModerationSubscriptionPolicy) +from mailman_pgp.workflows.subscription import ( + ConfirmModerationSubscriptionPolicy, ConfirmSubscriptionPolicy, + ModerationSubscriptionPolicy) class TestSubscriptionWorkflows(unittest.TestCase): -- cgit v1.2.3-70-g09d2 From 288d5c07825f67e6fd92e93fc3d8369310e268d1 Mon Sep 17 00:00:00 2001 From: J08nY Date: Tue, 11 Jul 2017 15:13:15 +0200 Subject: Add OpenSubscriptionPolicy. --- src/mailman_pgp/workflows/subscription.py | 40 ++++++++++++++++++++-- .../workflows/tests/test_subscription.py | 6 +++- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py index 6b8240c..a7565f3 100644 --- a/src/mailman_pgp/workflows/subscription.py +++ b/src/mailman_pgp/workflows/subscription.py @@ -27,6 +27,40 @@ from zope.interface import implementer from mailman_pgp.workflows.base import PubkeyMixin +@public +@implementer(ISubscriptionWorkflow) +class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, PubkeyMixin): + """""" + + name = 'pgp-policy-open' + description = _('An open subscription policy, ' + 'for a PGP-enabled mailing list.') + initial_state = 'prepare' + save_attributes = ( + 'verified', + 'pubkey_key', + 'pubkey_confirmed', + 'address_key', + 'subscriber_key', + 'user_key', + 'token_owner_key', + ) + + def __init__(self, mlist, subscriber=None, *, + pre_verified=False, pubkey=None, + pubkey_pre_confirmed=False): + SubscriptionBase.__init__(self, mlist, subscriber) + VerificationMixin.__init__(self, pre_verified=pre_verified) + PubkeyMixin.__init__(self, pubkey=pubkey, + pre_confirmed=pubkey_pre_confirmed) + + def _step_prepare(self): + self.push('do_subscription') + self.push('pubkey_checks') + self.push('verification_checks') + self.push('sanity_checks') + + @public @implementer(ISubscriptionWorkflow) class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, @@ -34,7 +68,7 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, """""" name = 'pgp-policy-confirm' - description = _('An subscription policy, for a PGP-enabled mailing list ' + description = _('A subscription policy, for a PGP-enabled mailing list ' 'that requires confirmation.') initial_state = 'prepare' save_attributes = ( @@ -72,7 +106,7 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, """""" name = 'pgp-policy-moderate' - description = _('An subscription policy, for a PGP-enabled mailing list ' + description = _('A subscription policy, for a PGP-enabled mailing list ' 'that requires moderation.') initial_state = 'prepare' save_attributes = ( @@ -111,7 +145,7 @@ class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, """""" name = 'pgp-policy-confirm-moderate' - description = _('An subscription policy, for a PGP-enabled mailing list ' + description = _('A subscription policy, for a PGP-enabled mailing list ' 'that requires moderation after confirmation.') initial_state = 'prepare' save_attributes = ( diff --git a/src/mailman_pgp/workflows/tests/test_subscription.py b/src/mailman_pgp/workflows/tests/test_subscription.py index f8a67f3..9464a83 100644 --- a/src/mailman_pgp/workflows/tests/test_subscription.py +++ b/src/mailman_pgp/workflows/tests/test_subscription.py @@ -26,7 +26,7 @@ from mailman_pgp.database import mm_transaction from mailman_pgp.testing.layers import PGPConfigLayer from mailman_pgp.workflows.subscription import ( ConfirmModerationSubscriptionPolicy, ConfirmSubscriptionPolicy, - ModerationSubscriptionPolicy) + ModerationSubscriptionPolicy, OpenSubscriptionPolicy) class TestSubscriptionWorkflows(unittest.TestCase): @@ -39,6 +39,10 @@ class TestSubscriptionWorkflows(unittest.TestCase): self.sender = getUtility(IUserManager).create_address( 'rsa-1024b@example.org') + def test_open_policy(self): + workflow = OpenSubscriptionPolicy(self.mlist, self.sender) + next(workflow) + def test_confirm_policy(self): workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender) next(workflow) -- cgit v1.2.3-70-g09d2 From 90d8ab005fede36adf66cf1a042ef33a3ece4c8d Mon Sep 17 00:00:00 2001 From: J08nY Date: Tue, 11 Jul 2017 19:08:27 +0200 Subject: Reset token owner on key confirmation. --- src/mailman_pgp/workflows/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index 9bf4db8..8a8126a 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -115,4 +115,4 @@ class PubkeyMixin: raise StopIteration def _step_receive_confirmation(self): - pass + self._set_token(TokenOwner.no_one) -- cgit v1.2.3-70-g09d2 From 97abc15b2e92fcf9109997043e4297f16d0bf5c7 Mon Sep 17 00:00:00 2001 From: J08nY Date: Tue, 11 Jul 2017 20:24:33 +0200 Subject: Restore subscriber in PubkeyMixins last step. --- src/mailman_pgp/workflows/base.py | 19 ++++++++++++++----- src/mailman_pgp/workflows/tests/test_base.py | 8 +++++--- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index 8a8126a..4ca1525 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -18,6 +18,7 @@ """""" from mailman.email.message import UserNotification from mailman.interfaces.subscriptions import TokenOwner +from mailman.workflows.common import WhichSubscriber from pgpy import PGPKey from mailman_pgp.model.address import PGPAddress @@ -63,11 +64,12 @@ class PubkeyMixin: self.pubkey = None def _step_pubkey_checks(self): + self.push('restore_subscriber') if not self.pubkey: self.push('send_key_request') else: if not self.pubkey_confirmed: - self.push('send_confirm_request') + self.push('send_key_confirm_request') def _step_send_key_request(self): self._set_token(TokenOwner.subscriber) @@ -92,11 +94,11 @@ class PubkeyMixin: else: self.pubkey = pgp_address.key if not self.pubkey_confirmed: - self.push('send_confirm_request') + self.push('send_key_confirm_request') - def _step_send_confirm_request(self): + def _step_send_key_confirm_request(self): self._set_token(TokenOwner.subscriber) - self.push('receive_confirmation') + self.push('receive_key_confirmation') self.save() request_address = self.mlist.request_address email_address = self.address.email @@ -114,5 +116,12 @@ class PubkeyMixin: msg.send(self.mlist) raise StopIteration - def _step_receive_confirmation(self): + def _step_receive_key_confirmation(self): self._set_token(TokenOwner.no_one) + + def _step_restore_subscriber(self): + if self.which is WhichSubscriber.address: + self.subscriber = self.address + else: + assert self.which is WhichSubscriber.user + self.subscriber = self.user diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py index a52571c..c99c6d6 100644 --- a/src/mailman_pgp/workflows/tests/test_base.py +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -88,6 +88,7 @@ class TestPubkeyMixin(unittest.TestCase): pubkey=self.sender_key.pubkey, pubkey_pre_confirmed=True) workflow.run_thru('pubkey_checks') + next(workflow) with patch.object(workflow, '_step_do_subscription') as step: next(workflow) step.assert_called_once_with() @@ -112,7 +113,7 @@ class TestPubkeyMixin(unittest.TestCase): self.sender_key.pubkey.fingerprint) with patch.object(receive_workflow, - '_step_send_confirm_request') as step: + '_step_send_key_confirm_request') as step: next(receive_workflow) step.assert_called_once_with() @@ -147,6 +148,7 @@ class TestPubkeyMixin(unittest.TestCase): receive_workflow.token = workflow.token receive_workflow.restore() receive_workflow.run_thru('receive_key') + next(receive_workflow) with patch.object(receive_workflow, '_step_do_subscription') as step: next(receive_workflow) step.assert_called_once_with() @@ -182,8 +184,8 @@ class TestPubkeyMixin(unittest.TestCase): receive_workflow = ConfirmSubscriptionPolicy(self.mlist) receive_workflow.token = workflow.token receive_workflow.restore() - receive_workflow.run_thru('receive_confirmation') - + receive_workflow.run_thru('receive_key_confirmation') + next(receive_workflow) with patch.object(receive_workflow, '_step_do_subscription') as step: next(receive_workflow) step.assert_called_once_with() -- cgit v1.2.3-70-g09d2 From 77b63eaae46697b24fe4cc604a36b869619b4638 Mon Sep 17 00:00:00 2001 From: J08nY Date: Tue, 11 Jul 2017 20:57:29 +0200 Subject: More work on key command. Added key_confirmed attribute to PGPAddress. --- src/mailman_pgp/commands/eml_key.py | 220 ++++++++++++++++++----------- src/mailman_pgp/commands/tests/__init__.py | 0 src/mailman_pgp/commands/tests/test_key.py | 123 ++++++++++++++++ src/mailman_pgp/model/address.py | 10 +- src/mailman_pgp/pgp/mime.py | 22 ++- 5 files changed, 287 insertions(+), 88 deletions(-) create mode 100644 src/mailman_pgp/commands/tests/__init__.py create mode 100644 src/mailman_pgp/commands/tests/test_key.py diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index b280603..e9b5e19 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -32,106 +32,154 @@ from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.workflows.base import CONFIRM_REQUEST +def _get_email(msg): + display_name, email = parseaddr(msg['from']) + # Address could be None or the empty string. + if not email: + email = msg.sender + return email + + +def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results): + if len(arguments) != 2: + print('Missing token.', file=results) + return ContinueProcessing.no + + wrapped = PGPWrapper(msg) + if not wrapped.has_keys(): + print('No keys attached? Send a key.', file=results) + return ContinueProcessing.no + + keys = list(wrapped.keys()) + if len(keys) != 1: + print('More than one key! Send only one key.', file=results) + return ContinueProcessing.no + + email = _get_email(msg) + if not email: + print('No email to subscribe with.', file=results) + return ContinueProcessing.no + + address = getUtility(IUserManager).get_address(email) + if not address: + print('No adddress to subscribe with.', file=results) + return ContinueProcessing.no + + with transaction() as t: + pgp_address = PGPAddress(address) + pgp_address.key = keys.pop() + t.add(pgp_address) + + token = arguments[1] + try: + ISubscriptionManager(mlist).confirm(token) + print('Key succesfully set.', file=results) + print('Key fingerprint: {}'.format(pgp_address.key.fingerprint), file=results) + except LookupError: + print('Wrong token.', file=results) + + return ContinueProcessing.no + + +def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): + if len(arguments) != 2: + print('Missing token', file=results) + return ContinueProcessing.no + + email = _get_email(msg) + if not email: + print('No email to subscribe with.', file=results) + return ContinueProcessing.no + + pgp_address = PGPAddress.for_email(email) + if pgp_address is None: + print('A pgp enabled address not found.', file=results) + return ContinueProcessing.no + + wrapped = PGPWrapper(msg) + if wrapped.is_encrypted(): + decrypted = wrapped.decrypt(pgp_list.key) + wrapped = PGPWrapper(decrypted) + + if not wrapped.is_signed(): + print('Message not signed, ignoring.', file=results) + return ContinueProcessing.no + + if not wrapped.verifies(pgp_address.key): + print('Message failed to verify.', file=results) + return ContinueProcessing.no + + token = arguments[1] + + expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint, + token) + for sig_subject in wrapped.get_signed(): + if expecting in sig_subject: + with transaction(): + pgp_address.key_confirmed = True + ISubscriptionManager(mlist).confirm(token) + break + else: + print("Message doesn't contain the expected statement.", file=results) + return ContinueProcessing.no + + +def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results): + # New public key in attachment, requires to be signed with current + # key + pass + + +def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results): + # Current key revocation certificate in attachment, restarts the + # subscription process, or rather only it's key setup part. + pass + + +def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results): + # List public key attached, signed by the users current key. + pass + + +SUBCOMMANDS = { + 'set': _cmd_set, + 'confirm': _cmd_confirm, + 'change': _cmd_change, + 'revoke': _cmd_revoke, + 'sign': _cmd_sign +} + +ARGUMENTS = '<' + '|'.join(SUBCOMMANDS.keys()) + '>' + + @public @implementer(IEmailCommand) class KeyCommand: """The `key` command.""" name = 'key' - argument_description = '' + argument_description = ARGUMENTS short_description = '' - description = '' + description = """\ + + """ def process(self, mlist, msg, msgdata, arguments, results): """See `IEmailCommand`.""" if len(arguments) == 0: print('No sub-command specified,' - ' must be one of .', file=results) + ' must be one of {}.'.format(ARGUMENTS), file=results) + return ContinueProcessing.no + + if arguments[0] not in SUBCOMMANDS: + print('Wrong sub-command specified,' + ' must be one of {}.'.format(ARGUMENTS), file=results) return ContinueProcessing.no pgp_list = PGPMailingList.for_list(mlist) if pgp_list is None: - print('This mailing list doesnt have pgp enabled.', file=results) - return ContinueProcessing.yes # XXX: What does this even mean? - - if arguments[0] == 'set': - if len(arguments) != 2: - print('Missing token', file=results) - return ContinueProcessing.no - - wrapped = PGPWrapper(msg) - if not wrapped.has_keys(): - print('No keys here?') - return ContinueProcessing.no - - keys = list(wrapped.keys()) - if len(keys) != 1: - print('More than one key! Which one is yours?') - return ContinueProcessing.no - - with transaction() as t: - pgp_address = PGPAddress(self.address) - pgp_address.key = keys.pop() - t.add(pgp_address) - - token = arguments[1] - ISubscriptionManager(mlist).confirm(token) - # XXX finish this - elif arguments[0] == 'confirm': - if len(arguments) != 2: - print('Missing token', file=results) - return ContinueProcessing.no - - display_name, email = parseaddr(msg['from']) - # Address could be None or the empty string. - if not email: - email = msg.sender - if not email: - print('No email to subscribe with.', file=results) - return ContinueProcessing.no - um = getUtility(IUserManager) - - pgp_address = PGPAddress.for_address(um.get_address(email)) - if pgp_address is None: - # TODO - pass - - wrapped = PGPWrapper(msg) - if wrapped.is_encrypted(): - decrypted = wrapped.decrypt(pgp_list.key) - wrapped = PGPWrapper(decrypted) - - if not wrapped.is_signed(): - print('Message not signed, ignoring.', file=results) - return ContinueProcessing.no - - if not wrapped.verifies(pgp_address.key): - print('Message failed to verify.', file=results) - return ContinueProcessing.no - - token = arguments[1] - - expecting = CONFIRM_REQUEST.format(pgp_address.key.pubkey, - token) - if wrapped.get_signed() == expecting: - ISubscriptionManager(mlist).confirm(token) - else: - # TODO - pass - - elif arguments[0] == 'change': - # New public key in attachment, requires to be signed with current - # key - pass - elif arguments[0] == 'revoke': - # Current key revocation certificate in attachment, restarts the - # subscription process, or rather only it's key setup part. - pass - elif arguments[0] == 'sign': - # List public key attached, signed by the users current key. - pass - else: - print('Wrong sub-command specified,' - ' must be one of .', file=results) + print("This mailing list doesn't have pgp enabled.", file=results) + return ContinueProcessing.no - return ContinueProcessing.no + command = SUBCOMMANDS[arguments[0]] + return command(pgp_list, mlist, msg, msgdata, arguments, results) diff --git a/src/mailman_pgp/commands/tests/__init__.py b/src/mailman_pgp/commands/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py new file mode 100644 index 0000000..8b44011 --- /dev/null +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -0,0 +1,123 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . + +"""""" + +import unittest + +from mailman.app.lifecycle import create_list +from mailman.email.message import Message +from mailman.interfaces.subscriptions import ISubscriptionManager +from mailman.interfaces.usermanager import IUserManager +from mailman.runners.command import CommandRunner +from mailman.testing.helpers import make_testable_runner, get_queue_messages +from mailman.utilities.datetime import now +from zope.component import getUtility + +from mailman_pgp.config import mm_config +from mailman_pgp.database import transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.mime import MIMEWrapper +from mailman_pgp.pgp.tests.base import load_key +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.workflows.base import CONFIRM_REQUEST +from mailman_pgp.workflows.subscription import OpenSubscriptionPolicy + + +class TestPreSubscription(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + self.mlist = create_list('test@example.com', style_name='pgp-default') + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.generate_key(True) + + def test_set(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart) + + get_queue_messages('virgin') + + bart_key = load_key('rsa_1024.priv.asc') + + set_message = Message() + set_message['From'] = 'bart@example.com' + set_message['To'] = 'test@example.com' + set_message['Subject'] = 'Re: key set {}'.format(token) + set_message.set_type('multipart/mixed') + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + pgp_address = PGPAddress.for_address(bart) + self.assertIsNotNone(pgp_address) + self.assertEqual(pgp_address.key.fingerprint, bart_key.fingerprint) + self.assertEqual(pgp_address.key_fingerprint, bart_key.fingerprint) + self.assertFalse(pgp_address.key_confirmed) + + items = get_queue_messages('virgin', expected_count=2) + if items[0].msg['Subject'] == 'The results of your email commands': + confirm_request = items[1].msg + else: + confirm_request = items[0].msg + + confirm_wrapped = PGPWrapper(confirm_request) + self.assertTrue(confirm_wrapped.is_encrypted()) + + def test_confirm(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + bart_key = load_key('rsa_1024.priv.asc') + + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart, pubkey=bart_key.pubkey) + + get_queue_messages('virgin') + + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key =bart_key.pubkey + t.add(pgp_address) + + confirm_message = Message() + confirm_message['From'] = 'bart@example.com' + confirm_message['To'] = 'test@example.com' + confirm_message['Subject'] = 'Re: key confirm {}'.format(token) + confirm_message.set_payload( + CONFIRM_REQUEST.format(bart_key.fingerprint, token)) + wrapped_confirm_message = MIMEWrapper(confirm_message) + confirm_message = wrapped_confirm_message.sign(bart_key) + + mm_config.switchboards['command'].enqueue(confirm_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + pgp_address = PGPAddress.for_address(bart) + self.assertTrue(pgp_address.key_confirmed) + self.assertTrue(self.mlist.is_subscribed(bart)) diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py index 1d07486..6fc3050 100644 --- a/src/mailman_pgp/model/address.py +++ b/src/mailman_pgp/model/address.py @@ -22,7 +22,7 @@ from os.path import exists, isfile, join from mailman.database.types import SAUnicode from mailman.interfaces.usermanager import IUserManager from pgpy import PGPKey -from sqlalchemy import Column, Integer +from sqlalchemy import Column, Integer, Boolean from sqlalchemy.orm import reconstructor from zope.component import getUtility @@ -38,6 +38,7 @@ class PGPAddress(Base): id = Column(Integer, primary_key=True) email = Column(SAUnicode, index=True) key_fingerprint = Column(SAUnicode) + key_confirmed = Column(Boolean) def __init__(self, address): super().__init__() @@ -49,6 +50,7 @@ class PGPAddress(Base): def _init(self): self._address = None self._key = None + self.key_confirmed = False @property def key(self): @@ -119,3 +121,9 @@ class PGPAddress(Base): if address is None: return None return PGPAddress.query().filter_by(email=address.email).first() + + @staticmethod + def for_email(email): + if email is None: + return None + return PGPAddress.query().filter_by(email=email).first() diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py index 1ecbe76..975b8bd 100644 --- a/src/mailman_pgp/pgp/mime.py +++ b/src/mailman_pgp/pgp/mime.py @@ -87,7 +87,7 @@ class MIMEWrapper: return self.is_signed() def get_signed(self): - yield self.msg.get_payload(0).as_string(0) + yield self.msg.get_payload(0).as_string() def is_encrypted(self): """ @@ -145,6 +145,26 @@ class MIMEWrapper: key, _ = PGPKey.from_blob(part.get_payload()) yield key + def attach_key(self, key): + """ + + :param key: + :type key: pgpy.PGPKey + :return: + """ + filename = '0x' + key.fingerprint.keyid + '.asc' + key_part = MIMEApplication(_data=str(key), + _subtype=MIMEWrapper._keys_subtype, + _encoder=encode_7or8bit, + name=filename) + key_part.add_header('Content-Description', + 'OpenPGP key') + key_part.add_header('Content-Disposition', 'attachment', + filename=filename) + out = copy.deepcopy(self.msg) + out.attach(key_part) + return out + def verify(self, key): """ Verify the signature of this message with key. -- cgit v1.2.3-70-g09d2 From 32cbc0b7da7e62fb4acdd4ce9e484a7109a32317 Mon Sep 17 00:00:00 2001 From: J08nY Date: Tue, 11 Jul 2017 21:03:01 +0200 Subject: qa. --- src/mailman_pgp/commands/eml_key.py | 4 ++-- src/mailman_pgp/commands/tests/test_key.py | 4 ++-- src/mailman_pgp/model/address.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index e9b5e19..15db0b1 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -74,7 +74,8 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results): try: ISubscriptionManager(mlist).confirm(token) print('Key succesfully set.', file=results) - print('Key fingerprint: {}'.format(pgp_address.key.fingerprint), file=results) + print('Key fingerprint: {}'.format(pgp_address.key.fingerprint), + file=results) except LookupError: print('Wrong token.', file=results) @@ -161,7 +162,6 @@ class KeyCommand: argument_description = ARGUMENTS short_description = '' description = """\ - """ def process(self, mlist, msg, msgdata, arguments, results): diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index 8b44011..95f9766 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -24,7 +24,7 @@ from mailman.email.message import Message from mailman.interfaces.subscriptions import ISubscriptionManager from mailman.interfaces.usermanager import IUserManager from mailman.runners.command import CommandRunner -from mailman.testing.helpers import make_testable_runner, get_queue_messages +from mailman.testing.helpers import get_queue_messages, make_testable_runner from mailman.utilities.datetime import now from zope.component import getUtility @@ -102,7 +102,7 @@ class TestPreSubscription(unittest.TestCase): with transaction() as t: pgp_address = PGPAddress(bart) - pgp_address.key =bart_key.pubkey + pgp_address.key = bart_key.pubkey t.add(pgp_address) confirm_message = Message() diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py index 6fc3050..f102bf7 100644 --- a/src/mailman_pgp/model/address.py +++ b/src/mailman_pgp/model/address.py @@ -22,7 +22,7 @@ from os.path import exists, isfile, join from mailman.database.types import SAUnicode from mailman.interfaces.usermanager import IUserManager from pgpy import PGPKey -from sqlalchemy import Column, Integer, Boolean +from sqlalchemy import Boolean, Column, Integer from sqlalchemy.orm import reconstructor from zope.component import getUtility -- cgit v1.2.3-70-g09d2 From beff82c480fa4f9acaede60a5f789e880721cc81 Mon Sep 17 00:00:00 2001 From: J08nY Date: Tue, 11 Jul 2017 21:31:40 +0200 Subject: Don't recheck addresses with keys and key confirmations. --- src/mailman_pgp/commands/eml_key.py | 5 ++++- src/mailman_pgp/workflows/base.py | 18 ++++++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index 15db0b1..fbae25e 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -65,8 +65,11 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results): print('No adddress to subscribe with.', file=results) return ContinueProcessing.no + with transaction() as t: - pgp_address = PGPAddress(address) + pgp_address = PGPAddress.for_address(address) + if pgp_address is None: + pgp_address = PGPAddress(address) pgp_address.key = keys.pop() t.add(pgp_address) diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index 4ca1525..ace8f91 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -65,11 +65,21 @@ class PubkeyMixin: def _step_pubkey_checks(self): self.push('restore_subscriber') - if not self.pubkey: - self.push('send_key_request') + + pgp_address = PGPAddress.for_address(self.address) + + if pgp_address is not None: + if not pgp_address.key: + self.push('send_key_request') + else: + if not pgp_address.key_confirmed: + self.push('send_key_confirm_request') else: - if not self.pubkey_confirmed: - self.push('send_key_confirm_request') + if not self.pubkey: + self.push('send_key_request') + else: + if not self.pubkey_confirmed: + self.push('send_key_confirm_request') def _step_send_key_request(self): self._set_token(TokenOwner.subscriber) -- cgit v1.2.3-70-g09d2 From 4bab3532e38f6fd4826e0af2f96b6060286e3ed7 Mon Sep 17 00:00:00 2001 From: J08nY Date: Tue, 11 Jul 2017 22:07:06 +0200 Subject: Some more tests for the key command. --- src/mailman_pgp/commands/eml_key.py | 2 +- src/mailman_pgp/commands/tests/test_key.py | 153 +++++++++++++++++++++++++++++ 2 files changed, 154 insertions(+), 1 deletion(-) diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index fbae25e..e2a2443 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -87,7 +87,7 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results): def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): if len(arguments) != 2: - print('Missing token', file=results) + print('Missing token.', file=results) return ContinueProcessing.no email = _get_email(msg) diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index 95f9766..b3d8c95 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -40,6 +40,59 @@ from mailman_pgp.workflows.base import CONFIRM_REQUEST from mailman_pgp.workflows.subscription import OpenSubscriptionPolicy +class TestPreDispatch(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + self.mlist = create_list('test@example.com') + + def test_no_arguments(self): + message = Message() + message['From'] = 'bart@example.com' + message['To'] = 'test@example.com' + message['Subject'] = 'key' + message.set_payload('') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No sub-command specified', results_msg.get_payload()) + + def test_wrong_subcommand(self): + message = Message() + message['From'] = 'bart@example.com' + message['To'] = 'test@example.com' + message['Subject'] = 'key wroooooong' + message.set_payload('') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Wrong sub-command specified', results_msg.get_payload()) + + def test_no_pgp_list(self): + message = Message() + message['From'] = 'bart@example.com' + message['To'] = 'test@example.com' + message['Subject'] = 'key set' + message.set_payload('') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn("This mailing list doesn't have pgp enabled.", + results_msg.get_payload()) + + class TestPreSubscription(unittest.TestCase): layer = PGPConfigLayer @@ -87,6 +140,60 @@ class TestPreSubscription(unittest.TestCase): confirm_wrapped = PGPWrapper(confirm_request) self.assertTrue(confirm_wrapped.is_encrypted()) + def test_set_no_token(self): + message = Message() + message['From'] = 'bart@example.com' + message['To'] = 'test@example.com' + message['Subject'] = 'key set' + message.set_payload('') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Missing token.', results_msg.get_payload()) + + def test_set_no_key(self): + message = Message() + message['From'] = 'bart@example.com' + message['To'] = 'test@example.com' + message['Subject'] = 'key set token' + message.set_payload('') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No keys attached? Send a key.', + results_msg.get_payload()) + + def test_set_multiple_keys(self): + bart_key = load_key('rsa_1024.priv.asc') + anne_key = load_key('ecc_p256.priv.asc') + + set_message = Message() + set_message['From'] = 'bart@example.com' + set_message['To'] = 'test@example.com' + set_message['Subject'] = 'Re: key set token' + set_message.set_type('multipart/mixed') + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(bart_key.pubkey) + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(anne_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('More than one key! Send only one key.', + results_msg.get_payload()) + def test_confirm(self): self.mlist.subscription_policy = OpenSubscriptionPolicy bart = getUtility(IUserManager).create_address('bart@example.com', @@ -121,3 +228,49 @@ class TestPreSubscription(unittest.TestCase): pgp_address = PGPAddress.for_address(bart) self.assertTrue(pgp_address.key_confirmed) self.assertTrue(self.mlist.is_subscribed(bart)) + + def test_confirm_no_token(self): + message = Message() + message['From'] = 'bart@example.com' + message['To'] = 'test@example.com' + message['Subject'] = 'key confirm' + message.set_payload('') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Missing token.', results_msg.get_payload()) + + def test_confirm_no_email(self): + message = Message() + message['From'] = '' + message['To'] = 'test@example.com' + message['Subject'] = 'key confirm token' + message.set_payload('') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No email to subscribe with.', results_msg.get_payload()) + + def test_confirm_no_pgp_address(self): + message = Message() + message['From'] = 'bart@example.com' + message['To'] = 'test@example.com' + message['Subject'] = 'key confirm token' + message.set_payload('') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('A pgp enabled address not found.', + results_msg.get_payload()) -- cgit v1.2.3-70-g09d2 From 44bde427fdcc78344892f3413597c7be66eca8e9 Mon Sep 17 00:00:00 2001 From: J08nY Date: Tue, 11 Jul 2017 22:38:20 +0200 Subject: Add some more REST to addresses. --- src/mailman_pgp/rest/addresses.py | 37 ++++++++++++++--- src/mailman_pgp/rest/lists.py | 2 +- src/mailman_pgp/rest/tests/test_addresses.py | 60 ++++++++++++++++++++++++++++ src/mailman_pgp/rest/tests/test_lists.py | 2 +- 4 files changed, 93 insertions(+), 8 deletions(-) create mode 100644 src/mailman_pgp/rest/tests/test_addresses.py diff --git a/src/mailman_pgp/rest/addresses.py b/src/mailman_pgp/rest/addresses.py index 4f7a04d..93be6d3 100644 --- a/src/mailman_pgp/rest/addresses.py +++ b/src/mailman_pgp/rest/addresses.py @@ -17,19 +17,44 @@ """""" -from mailman.rest.helpers import CollectionMixin +from mailman.rest.helpers import CollectionMixin, okay, etag, not_found from public.public import public +from mailman_pgp.config import config +from mailman_pgp.model.address import PGPAddress + class _EncryptedBase(CollectionMixin): - pass + def _resource_as_dict(self, address): + """See `CollectionMixin`.""" + return dict(email=address.email, + key_fingerprint=address.key_fingerprint, + key_confirmed=address.key_confirmed, + self_link=self.api.path_to( + '/plugins/{}/addresses/{}'.format(config.name, + address.email) + )) + + def _get_collection(self, request): + """See `CollectionMixin`.""" + return PGPAddress.query().all() @public -class AllAddresses: - pass +class AllAddresses(_EncryptedBase): + def on_get(self, request, response): + """/addresses""" + resource = self._make_collection(request) + return okay(response, etag(resource)) @public -class AnAddress: - pass +class AnAddress(_EncryptedBase): + def __init__(self, email): + self._address = PGPAddress.for_email(email) + + def on_get(self, request, response): + if self._address is None: + return not_found(response) + else: + okay(response, self._resource_as_json(self._address)) diff --git a/src/mailman_pgp/rest/lists.py b/src/mailman_pgp/rest/lists.py index 9141bce..37b8d28 100644 --- a/src/mailman_pgp/rest/lists.py +++ b/src/mailman_pgp/rest/lists.py @@ -73,7 +73,7 @@ class AnEncryptedList(_EncryptedBase): okay(response, self._resource_as_json(self._mlist)) @child() - def key(self, context, segments): + def pubkey(self, context, segments): return AListPubkey(self._mlist), [] diff --git a/src/mailman_pgp/rest/tests/test_addresses.py b/src/mailman_pgp/rest/tests/test_addresses.py new file mode 100644 index 0000000..1730618 --- /dev/null +++ b/src/mailman_pgp/rest/tests/test_addresses.py @@ -0,0 +1,60 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . + +"""""" +import unittest +from urllib.error import HTTPError + +from mailman.interfaces.usermanager import IUserManager +from mailman.testing.helpers import call_api +from zope.component import getUtility + +from mailman_pgp.database import transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.testing.layers import PGPRESTLayer + + +class TestAddresses(unittest.TestCase): + layer = PGPRESTLayer + + def setUp(self): + self.mm_address = getUtility(IUserManager).create_address( + 'anne@example.com') + with transaction() as t: + self.address = PGPAddress(self.mm_address) + t.add(self.address) + + def test_missing_address(self): + with self.assertRaises(HTTPError) as cm: + call_api('http://localhost:9001/3.1/plugins/pgp/addresses/' + 'bart@example.com') + self.assertEqual(cm.exception.code, 404) + + def test_all_addresses(self): + json, response = call_api( + 'http://localhost:9001/3.1/plugins/pgp/addresses/') + self.assertEqual(json['total_size'], 1) + self.assertEqual(len(json['entries']), 1) + addresses = json['entries'] + address = addresses[0] + self.assertEqual(address['email'], self.address.email) + + def test_get_address(self): + json, response = call_api( + 'http://localhost:9001/3.1/plugins/pgp/addresses/' + 'anne@example.com') + self.assertEqual(json['email'], self.address.email) diff --git a/src/mailman_pgp/rest/tests/test_lists.py b/src/mailman_pgp/rest/tests/test_lists.py index a674cc9..c5c9854 100644 --- a/src/mailman_pgp/rest/tests/test_lists.py +++ b/src/mailman_pgp/rest/tests/test_lists.py @@ -68,7 +68,7 @@ class TestLists(TestCase): json, response = call_api( 'http://localhost:9001/3.1/plugins/pgp/lists/' - 'another.example.com/key') + 'another.example.com/pubkey') json.pop('http_etag') self.assertEqual(len(json.keys()), 2) -- cgit v1.2.3-70-g09d2 From 329532bc05f83edb4cd23e2ae82da777511c9857 Mon Sep 17 00:00:00 2001 From: J08nY Date: Tue, 11 Jul 2017 23:00:53 +0200 Subject: Add more tests for workflows. --- src/mailman_pgp/workflows/base.py | 1 - src/mailman_pgp/workflows/tests/test_base.py | 17 +++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index ace8f91..1e6b867 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -100,7 +100,6 @@ class PubkeyMixin: # The workflow was confirmed but we still dont have an address # or the pubkey. So resend request and wait. self.push('send_key_request') - return else: self.pubkey = pgp_address.key if not self.pubkey_confirmed: diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py index c99c6d6..4f39296 100644 --- a/src/mailman_pgp/workflows/tests/test_base.py +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -189,3 +189,20 @@ class TestPubkeyMixin(unittest.TestCase): with patch.object(receive_workflow, '_step_do_subscription') as step: next(receive_workflow) step.assert_called_once_with() + + def test_exisitng_pgp_address(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True) + + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow.run_thru('pubkey_checks') + next(workflow) + with patch.object(workflow, '_step_do_subscription') as step: + next(workflow) + step.assert_called_once_with() -- cgit v1.2.3-70-g09d2 From 65b25b13ba5a3831c43204da9358eb42b1b373af Mon Sep 17 00:00:00 2001 From: J08nY Date: Tue, 11 Jul 2017 23:16:23 +0200 Subject: Change default subscription policies to pgp enabled ones for pgp lists. --- src/mailman_pgp/styles/base.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/mailman_pgp/styles/base.py b/src/mailman_pgp/styles/base.py index 6b5271b..a7c3366 100644 --- a/src/mailman_pgp/styles/base.py +++ b/src/mailman_pgp/styles/base.py @@ -19,7 +19,7 @@ from lazr.config import as_boolean from public import public -from mailman_pgp.config import config +from mailman_pgp.config import config, mm_config from mailman_pgp.database import transaction from mailman_pgp.model.list import PGPMailingList @@ -32,6 +32,11 @@ class PGPStyle: """ mailing_list.posting_chain = 'pgp-posting-chain' + old_policy = mailing_list.subscription_policy.name + new_policy_name = 'pgp-' + old_policy[4:] + if new_policy_name in mm_config.workflows: + mailing_list.subscription_policy = new_policy_name + pgp_list = PGPMailingList.for_list(mailing_list) if pgp_list: return -- cgit v1.2.3-70-g09d2 From 2fd2f28f4389e3d0cad4fdc9c6067b5baff291e6 Mon Sep 17 00:00:00 2001 From: J08nY Date: Wed, 12 Jul 2017 02:41:48 +0200 Subject: Some more tests for key command. PEP8. --- src/mailman_pgp/commands/eml_key.py | 1 - src/mailman_pgp/commands/tests/test_key.py | 211 +++++++++++++++++++++-------- src/mailman_pgp/rest/addresses.py | 2 +- 3 files changed, 152 insertions(+), 62 deletions(-) diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index e2a2443..a2fb4d5 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -65,7 +65,6 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results): print('No adddress to subscribe with.', file=results) return ContinueProcessing.no - with transaction() as t: pgp_address = PGPAddress.for_address(address) if pgp_address is None: diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index b3d8c95..48ca3c9 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -40,6 +40,24 @@ from mailman_pgp.workflows.base import CONFIRM_REQUEST from mailman_pgp.workflows.subscription import OpenSubscriptionPolicy +def _create_plain(from_hdr, to_hdr, subject_hdr, payload): + message = Message() + message['From'] = from_hdr + message['To'] = to_hdr + message['Subject'] = subject_hdr + message.set_payload(payload) + return message + + +def _create_mixed(from_hdr, to_hdr, subject_hdr): + message = Message() + message['From'] = from_hdr + message['To'] = to_hdr + message['Subject'] = subject_hdr + message.set_type('multipart/mixed') + return message + + class TestPreDispatch(unittest.TestCase): layer = PGPConfigLayer @@ -47,11 +65,8 @@ class TestPreDispatch(unittest.TestCase): self.mlist = create_list('test@example.com') def test_no_arguments(self): - message = Message() - message['From'] = 'bart@example.com' - message['To'] = 'test@example.com' - message['Subject'] = 'key' - message.set_payload('') + message = _create_plain('bart@example.com', 'test@example.com', + 'key', '') mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -62,11 +77,8 @@ class TestPreDispatch(unittest.TestCase): self.assertIn('No sub-command specified', results_msg.get_payload()) def test_wrong_subcommand(self): - message = Message() - message['From'] = 'bart@example.com' - message['To'] = 'test@example.com' - message['Subject'] = 'key wroooooong' - message.set_payload('') + message = _create_plain('bart@example.com', 'test@example.com', + 'key wrooooooong', '') mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -77,11 +89,8 @@ class TestPreDispatch(unittest.TestCase): self.assertIn('Wrong sub-command specified', results_msg.get_payload()) def test_no_pgp_list(self): - message = Message() - message['From'] = 'bart@example.com' - message['To'] = 'test@example.com' - message['Subject'] = 'key set' - message.set_payload('') + message = _create_plain('bart@example.com', 'test@example.com', + 'key set', '') mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -99,7 +108,7 @@ class TestPreSubscription(unittest.TestCase): def setUp(self): self.mlist = create_list('test@example.com', style_name='pgp-default') self.pgp_list = PGPMailingList.for_list(self.mlist) - self.pgp_list.generate_key(True) + self.pgp_list.key = load_key('ecc_p256.priv.asc') def test_set(self): self.mlist.subscription_policy = OpenSubscriptionPolicy @@ -113,11 +122,8 @@ class TestPreSubscription(unittest.TestCase): bart_key = load_key('rsa_1024.priv.asc') - set_message = Message() - set_message['From'] = 'bart@example.com' - set_message['To'] = 'test@example.com' - set_message['Subject'] = 'Re: key set {}'.format(token) - set_message.set_type('multipart/mixed') + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'Re: key set {}'.format(token)) wrapped_set_message = MIMEWrapper(set_message) set_message = wrapped_set_message.attach_key(bart_key.pubkey) @@ -133,19 +139,22 @@ class TestPreSubscription(unittest.TestCase): items = get_queue_messages('virgin', expected_count=2) if items[0].msg['Subject'] == 'The results of your email commands': + results = items[0].msg confirm_request = items[1].msg else: + results = items[1].msg confirm_request = items[0].msg + self.assertIn('Key succesfully set.', results.get_payload()) + self.assertIn('Key fingerprint: {}'.format(bart_key.fingerprint), + results.get_payload()) + confirm_wrapped = PGPWrapper(confirm_request) self.assertTrue(confirm_wrapped.is_encrypted()) def test_set_no_token(self): - message = Message() - message['From'] = 'bart@example.com' - message['To'] = 'test@example.com' - message['Subject'] = 'key set' - message.set_payload('') + message = _create_plain('bart@example.com', 'test@example.com', + 'key set', '') mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -156,11 +165,8 @@ class TestPreSubscription(unittest.TestCase): self.assertIn('Missing token.', results_msg.get_payload()) def test_set_no_key(self): - message = Message() - message['From'] = 'bart@example.com' - message['To'] = 'test@example.com' - message['Subject'] = 'key set token' - message.set_payload('') + message = _create_plain('bart@example.com', 'test@example.com', + 'key set token', '') mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -175,11 +181,8 @@ class TestPreSubscription(unittest.TestCase): bart_key = load_key('rsa_1024.priv.asc') anne_key = load_key('ecc_p256.priv.asc') - set_message = Message() - set_message['From'] = 'bart@example.com' - set_message['To'] = 'test@example.com' - set_message['Subject'] = 'Re: key set token' - set_message.set_type('multipart/mixed') + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'Re: key set token') wrapped_set_message = MIMEWrapper(set_message) set_message = wrapped_set_message.attach_key(bart_key.pubkey) wrapped_set_message = MIMEWrapper(set_message) @@ -194,6 +197,38 @@ class TestPreSubscription(unittest.TestCase): self.assertIn('More than one key! Send only one key.', results_msg.get_payload()) + def test_set_no_email(self): + bart_key = load_key('rsa_1024.priv.asc') + + message = _create_mixed('', 'test@example.com', 'key set token') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_key(bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No email to subscribe with.', results_msg.get_payload()) + + def test_set_no_address(self): + bart_key = load_key('rsa_1024.priv.asc') + + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'key set token') + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No adddress to subscribe with.', + results_msg.get_payload()) + def test_confirm(self): self.mlist.subscription_policy = OpenSubscriptionPolicy bart = getUtility(IUserManager).create_address('bart@example.com', @@ -212,16 +247,48 @@ class TestPreSubscription(unittest.TestCase): pgp_address.key = bart_key.pubkey t.add(pgp_address) - confirm_message = Message() - confirm_message['From'] = 'bart@example.com' - confirm_message['To'] = 'test@example.com' - confirm_message['Subject'] = 'Re: key confirm {}'.format(token) - confirm_message.set_payload( - CONFIRM_REQUEST.format(bart_key.fingerprint, token)) - wrapped_confirm_message = MIMEWrapper(confirm_message) - confirm_message = wrapped_confirm_message.sign(bart_key) + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm {}'.format(token), + CONFIRM_REQUEST.format(bart_key.fingerprint, + token)) + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign(bart_key) - mm_config.switchboards['command'].enqueue(confirm_message, + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + pgp_address = PGPAddress.for_address(bart) + self.assertTrue(pgp_address.key_confirmed) + self.assertTrue(self.mlist.is_subscribed(bart)) + + def test_confirm_encrypted(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + bart_key = load_key('rsa_1024.priv.asc') + + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart, pubkey=bart_key.pubkey) + + get_queue_messages('virgin') + + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = bart_key.pubkey + t.add(pgp_address) + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm {}'.format(token), + CONFIRM_REQUEST.format(bart_key.fingerprint, + token)) + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign_encrypt(bart_key, self.pgp_list.pubkey, + bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, listid='test.example.com') make_testable_runner(CommandRunner, 'command').run() @@ -230,11 +297,8 @@ class TestPreSubscription(unittest.TestCase): self.assertTrue(self.mlist.is_subscribed(bart)) def test_confirm_no_token(self): - message = Message() - message['From'] = 'bart@example.com' - message['To'] = 'test@example.com' - message['Subject'] = 'key confirm' - message.set_payload('') + message = _create_plain('bart@example.com', 'test@example.com', + 'key confirm', '') mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -245,11 +309,8 @@ class TestPreSubscription(unittest.TestCase): self.assertIn('Missing token.', results_msg.get_payload()) def test_confirm_no_email(self): - message = Message() - message['From'] = '' - message['To'] = 'test@example.com' - message['Subject'] = 'key confirm token' - message.set_payload('') + message = _create_plain('', 'test@example.com', + 'key confirm token', '') mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -260,11 +321,8 @@ class TestPreSubscription(unittest.TestCase): self.assertIn('No email to subscribe with.', results_msg.get_payload()) def test_confirm_no_pgp_address(self): - message = Message() - message['From'] = 'bart@example.com' - message['To'] = 'test@example.com' - message['Subject'] = 'key confirm token' - message.set_payload('') + message = _create_plain('bart@example.com', 'test@example.com', + 'key confirm token', '') mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -274,3 +332,36 @@ class TestPreSubscription(unittest.TestCase): self.assertIn('A pgp enabled address not found.', results_msg.get_payload()) + + def test_confirm_not_signed(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + bart_key = load_key('rsa_1024.priv.asc') + + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart, pubkey=bart_key.pubkey) + + get_queue_messages('virgin') + + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = bart_key.pubkey + t.add(pgp_address) + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm {}'.format(token), + CONFIRM_REQUEST.format(bart_key.fingerprint, + token)) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Message not signed, ignoring.', + results_msg.get_payload()) diff --git a/src/mailman_pgp/rest/addresses.py b/src/mailman_pgp/rest/addresses.py index 93be6d3..6d99a04 100644 --- a/src/mailman_pgp/rest/addresses.py +++ b/src/mailman_pgp/rest/addresses.py @@ -17,7 +17,7 @@ """""" -from mailman.rest.helpers import CollectionMixin, okay, etag, not_found +from mailman.rest.helpers import CollectionMixin, etag, not_found, okay from public.public import public from mailman_pgp.config import config -- cgit v1.2.3-70-g09d2 From d01183610f3df226da76b3f853666e520cf22461 Mon Sep 17 00:00:00 2001 From: J08nY Date: Wed, 12 Jul 2017 19:48:22 +0200 Subject: Some more workflow tests. --- src/mailman_pgp/workflows/tests/test_base.py | 29 ++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py index 4f39296..5b07e5b 100644 --- a/src/mailman_pgp/workflows/tests/test_base.py +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -206,3 +206,32 @@ class TestPubkeyMixin(unittest.TestCase): with patch.object(workflow, '_step_do_subscription') as step: next(workflow) step.assert_called_once_with() + + def test_exisitng_pgp_address_not_confirmed(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True) + + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + t.add(pgp_address) + + workflow.run_thru('pubkey_checks') + with patch.object(workflow, '_step_send_key_confirm_request') as step: + next(workflow) + step.assert_called_once_with() + + def test_exisitng_pgp_address_no_key(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True) + + with transaction() as t: + pgp_address = PGPAddress(self.sender) + t.add(pgp_address) + + workflow.run_thru('pubkey_checks') + with patch.object(workflow, '_step_send_key_request') as step: + next(workflow) + step.assert_called_once_with() -- cgit v1.2.3-70-g09d2 From ce02d5e98feb6dab1a4a8189d109495a41718260 Mon Sep 17 00:00:00 2001 From: J08nY Date: Wed, 12 Jul 2017 22:52:57 +0200 Subject: Use restore_subscriber. --- src/mailman_pgp/workflows/base.py | 13 ++++--------- src/mailman_pgp/workflows/tests/test_base.py | 4 ---- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index 1e6b867..3c276cf 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -64,8 +64,6 @@ class PubkeyMixin: self.pubkey = None def _step_pubkey_checks(self): - self.push('restore_subscriber') - pgp_address = PGPAddress.for_address(self.address) if pgp_address is not None: @@ -95,6 +93,9 @@ class PubkeyMixin: raise StopIteration def _step_receive_key(self): + self._restore_subscriber() + self._set_token(TokenOwner.no_one) + pgp_address = PGPAddress.for_address(self.address) if pgp_address is None or pgp_address.key is None: # The workflow was confirmed but we still dont have an address @@ -126,11 +127,5 @@ class PubkeyMixin: raise StopIteration def _step_receive_key_confirmation(self): + self._restore_subscriber() self._set_token(TokenOwner.no_one) - - def _step_restore_subscriber(self): - if self.which is WhichSubscriber.address: - self.subscriber = self.address - else: - assert self.which is WhichSubscriber.user - self.subscriber = self.user diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py index 5b07e5b..ef3a009 100644 --- a/src/mailman_pgp/workflows/tests/test_base.py +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -88,7 +88,6 @@ class TestPubkeyMixin(unittest.TestCase): pubkey=self.sender_key.pubkey, pubkey_pre_confirmed=True) workflow.run_thru('pubkey_checks') - next(workflow) with patch.object(workflow, '_step_do_subscription') as step: next(workflow) step.assert_called_once_with() @@ -148,7 +147,6 @@ class TestPubkeyMixin(unittest.TestCase): receive_workflow.token = workflow.token receive_workflow.restore() receive_workflow.run_thru('receive_key') - next(receive_workflow) with patch.object(receive_workflow, '_step_do_subscription') as step: next(receive_workflow) step.assert_called_once_with() @@ -185,7 +183,6 @@ class TestPubkeyMixin(unittest.TestCase): receive_workflow.token = workflow.token receive_workflow.restore() receive_workflow.run_thru('receive_key_confirmation') - next(receive_workflow) with patch.object(receive_workflow, '_step_do_subscription') as step: next(receive_workflow) step.assert_called_once_with() @@ -202,7 +199,6 @@ class TestPubkeyMixin(unittest.TestCase): t.add(pgp_address) workflow.run_thru('pubkey_checks') - next(workflow) with patch.object(workflow, '_step_do_subscription') as step: next(workflow) step.assert_called_once_with() -- cgit v1.2.3-70-g09d2 From a4e412d40162e35c54704793938e1a5cbf196086 Mon Sep 17 00:00:00 2001 From: J08nY Date: Thu, 13 Jul 2017 00:04:53 +0200 Subject: Split PubkeyMixin into two. --- src/mailman_pgp/workflows/base.py | 21 ++++--- src/mailman_pgp/workflows/subscription.py | 33 ++++++----- src/mailman_pgp/workflows/tests/test_base.py | 85 ++++++++++++++++++---------- 3 files changed, 90 insertions(+), 49 deletions(-) diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index 3c276cf..20db5f9 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -45,7 +45,7 @@ Token: {} """ -class PubkeyMixin: +class SetPubkeyMixin: def __init__(self, pubkey=None, pre_confirmed=False): self.pubkey = pubkey self.pubkey_confirmed = pre_confirmed @@ -69,15 +69,9 @@ class PubkeyMixin: if pgp_address is not None: if not pgp_address.key: self.push('send_key_request') - else: - if not pgp_address.key_confirmed: - self.push('send_key_confirm_request') else: if not self.pubkey: self.push('send_key_request') - else: - if not self.pubkey_confirmed: - self.push('send_key_confirm_request') def _step_send_key_request(self): self._set_token(TokenOwner.subscriber) @@ -103,6 +97,19 @@ class PubkeyMixin: self.push('send_key_request') else: self.pubkey = pgp_address.key + + +class ConfirmPubkeyMixin: + def __init__(self, pre_confirmed=False): + self.pubkey_confirmed = pre_confirmed + + def _step_pubkey_confirmation(self): + pgp_address = PGPAddress.for_address(self.address) + + if pgp_address is not None: + if not pgp_address.key_confirmed and not self.pubkey_confirmed: + self.push('send_key_confirm_request') + else: if not self.pubkey_confirmed: self.push('send_key_confirm_request') diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py index a7565f3..c4138e6 100644 --- a/src/mailman_pgp/workflows/subscription.py +++ b/src/mailman_pgp/workflows/subscription.py @@ -24,12 +24,13 @@ from mailman.workflows.common import (ConfirmationMixin, ModerationMixin, from public import public from zope.interface import implementer -from mailman_pgp.workflows.base import PubkeyMixin +from mailman_pgp.workflows.base import ConfirmPubkeyMixin, SetPubkeyMixin @public @implementer(ISubscriptionWorkflow) -class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, PubkeyMixin): +class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, + SetPubkeyMixin, ConfirmPubkeyMixin): """""" name = 'pgp-policy-open' @@ -51,11 +52,12 @@ class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, PubkeyMixin): pubkey_pre_confirmed=False): SubscriptionBase.__init__(self, mlist, subscriber) VerificationMixin.__init__(self, pre_verified=pre_verified) - PubkeyMixin.__init__(self, pubkey=pubkey, - pre_confirmed=pubkey_pre_confirmed) + SetPubkeyMixin.__init__(self, pubkey=pubkey) + ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) def _step_prepare(self): self.push('do_subscription') + self.push('pubkey_confirmation') self.push('pubkey_checks') self.push('verification_checks') self.push('sanity_checks') @@ -64,7 +66,8 @@ class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, PubkeyMixin): @public @implementer(ISubscriptionWorkflow) class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, - ConfirmationMixin, PubkeyMixin): + ConfirmationMixin, SetPubkeyMixin, + ConfirmPubkeyMixin): """""" name = 'pgp-policy-confirm' @@ -88,11 +91,12 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, SubscriptionBase.__init__(self, mlist, subscriber) VerificationMixin.__init__(self, pre_verified=pre_verified) ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed) - PubkeyMixin.__init__(self, pubkey=pubkey, - pre_confirmed=pubkey_pre_confirmed) + SetPubkeyMixin.__init__(self, pubkey=pubkey) + ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) def _step_prepare(self): self.push('do_subscription') + self.push('pubkey_confirmation') self.push('pubkey_checks') self.push('confirmation_checks') self.push('verification_checks') @@ -102,7 +106,8 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, @public @implementer(ISubscriptionWorkflow) class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, - ModerationMixin, PubkeyMixin): + ModerationMixin, SetPubkeyMixin, + ConfirmPubkeyMixin): """""" name = 'pgp-policy-moderate' @@ -126,12 +131,13 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, SubscriptionBase.__init__(self, mlist, subscriber) VerificationMixin.__init__(self, pre_verified=pre_verified) ModerationMixin.__init__(self, pre_approved=pre_approved) - PubkeyMixin.__init__(self, pubkey=pubkey, - pre_confirmed=pubkey_pre_confirmed) + SetPubkeyMixin.__init__(self, pubkey=pubkey) + ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) def _step_prepare(self): self.push('do_subscription') self.push('moderation_checks') + self.push('pubkey_confirmation') self.push('pubkey_checks') self.push('verification_checks') self.push('sanity_checks') @@ -141,7 +147,7 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, @implementer(ISubscriptionWorkflow) class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, ConfirmationMixin, ModerationMixin, - PubkeyMixin): + SetPubkeyMixin, ConfirmPubkeyMixin): """""" name = 'pgp-policy-confirm-moderate' @@ -167,12 +173,13 @@ class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, VerificationMixin.__init__(self, pre_verified=pre_verified) ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed) ModerationMixin.__init__(self, pre_approved=pre_approved) - PubkeyMixin.__init__(self, pubkey=pubkey, - pre_confirmed=pubkey_pre_confirmed) + SetPubkeyMixin.__init__(self, pubkey=pubkey) + ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) def _step_prepare(self): self.push('do_subscription') self.push('moderation_checks') + self.push('pubkey_confirmation') self.push('pubkey_checks') self.push('confirmation_checks') self.push('verification_checks') diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py index ef3a009..913ab67 100644 --- a/src/mailman_pgp/workflows/tests/test_base.py +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -37,8 +37,7 @@ from mailman_pgp.workflows.base import KEY_REQUEST from mailman_pgp.workflows.subscription import ConfirmSubscriptionPolicy -class TestPubkeyMixin(unittest.TestCase): - layer = PGPConfigLayer +class PubkeyMixinSetup(): def setUp(self): with mm_transaction(): @@ -55,19 +54,9 @@ class TestPubkeyMixin(unittest.TestCase): self.sender_key = load_key('rsa_1024.priv.asc') self.sender = self.um.create_address('rsa-1024b@example.org') - def test_pended_data(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) - with suppress(StopIteration): - workflow.run_thru('send_key_request') - self.assertIsNotNone(workflow.token) - pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) - self.assertEqual(pendable['list_id'], 'test.example.com') - self.assertEqual(pendable['email'], 'rsa-1024b@example.org') - self.assertEqual(pendable['display_name'], '') - self.assertEqual(pendable['when'], '2005-08-01T07:49:23') - self.assertEqual(pendable['token_owner'], 'subscriber') + +class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): + layer = PGPConfigLayer def test_key_request_sent(self): workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, @@ -81,17 +70,6 @@ class TestPubkeyMixin(unittest.TestCase): self.assertEqual(message['Subject'], 'key set {}'.format(token)) self.assertEqual(message.get_payload(), KEY_REQUEST) - def test_key_request_pubkey_set(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True, - pubkey=self.sender_key.pubkey, - pubkey_pre_confirmed=True) - workflow.run_thru('pubkey_checks') - with patch.object(workflow, '_step_do_subscription') as step: - next(workflow) - step.assert_called_once_with() - def test_receive_key(self): workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, pre_verified=True, @@ -111,6 +89,7 @@ class TestPubkeyMixin(unittest.TestCase): self.assertEqual(receive_workflow.pubkey.fingerprint, self.sender_key.pubkey.fingerprint) + receive_workflow.run_thru('pubkey_confirmation') with patch.object(receive_workflow, '_step_send_key_confirm_request') as step: next(receive_workflow) @@ -146,11 +125,26 @@ class TestPubkeyMixin(unittest.TestCase): receive_workflow = ConfirmSubscriptionPolicy(self.mlist) receive_workflow.token = workflow.token receive_workflow.restore() - receive_workflow.run_thru('receive_key') + receive_workflow.run_thru('pubkey_confirmation') with patch.object(receive_workflow, '_step_do_subscription') as step: next(receive_workflow) step.assert_called_once_with() + +class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): + layer = PGPConfigLayer + + def test_key_request_pubkey_set(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=True) + workflow.run_thru('pubkey_confirmation') + with patch.object(workflow, '_step_do_subscription') as step: + next(workflow) + step.assert_called_once_with() + def test_send_key_confirm_request(self): workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, pre_verified=True, @@ -187,6 +181,39 @@ class TestPubkeyMixin(unittest.TestCase): next(receive_workflow) step.assert_called_once_with() + +class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase): + layer = PGPConfigLayer + + def test_pended_data_key_request(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True) + with suppress(StopIteration): + workflow.run_thru('send_key_request') + self.assertIsNotNone(workflow.token) + pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) + self.assertEqual(pendable['list_id'], 'test.example.com') + self.assertEqual(pendable['email'], 'rsa-1024b@example.org') + self.assertEqual(pendable['display_name'], '') + self.assertEqual(pendable['when'], '2005-08-01T07:49:23') + self.assertEqual(pendable['token_owner'], 'subscriber') + + def test_pended_data_key_confirmation(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True, + pubkey=self.sender_key.pubkey) + with suppress(StopIteration): + workflow.run_thru('send_key_confirm_request') + self.assertIsNotNone(workflow.token) + pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) + self.assertEqual(pendable['list_id'], 'test.example.com') + self.assertEqual(pendable['email'], 'rsa-1024b@example.org') + self.assertEqual(pendable['display_name'], '') + self.assertEqual(pendable['when'], '2005-08-01T07:49:23') + self.assertEqual(pendable['token_owner'], 'subscriber') + def test_exisitng_pgp_address(self): workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, pre_verified=True, @@ -198,7 +225,7 @@ class TestPubkeyMixin(unittest.TestCase): pgp_address.key_confirmed = True t.add(pgp_address) - workflow.run_thru('pubkey_checks') + workflow.run_thru('pubkey_confirmation') with patch.object(workflow, '_step_do_subscription') as step: next(workflow) step.assert_called_once_with() @@ -213,7 +240,7 @@ class TestPubkeyMixin(unittest.TestCase): pgp_address.key = self.sender_key.pubkey t.add(pgp_address) - workflow.run_thru('pubkey_checks') + workflow.run_thru('pubkey_confirmation') with patch.object(workflow, '_step_send_key_confirm_request') as step: next(workflow) step.assert_called_once_with() -- cgit v1.2.3-70-g09d2 From af5b7950923022c0476cbc576cd8536d18c39ef6 Mon Sep 17 00:00:00 2001 From: J08nY Date: Thu, 13 Jul 2017 01:50:57 +0200 Subject: Add key change command and some tests. --- src/mailman_pgp/commands/eml_key.py | 26 ++++++- src/mailman_pgp/commands/tests/test_key.py | 43 +++++++++++ src/mailman_pgp/workflows/base.py | 1 - src/mailman_pgp/workflows/key_change.py | 117 +++++++++++++++++++++++++++++ 4 files changed, 185 insertions(+), 2 deletions(-) create mode 100644 src/mailman_pgp/workflows/key_change.py diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index a2fb4d5..9df6065 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -30,6 +30,7 @@ from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.workflows.base import CONFIRM_REQUEST +from mailman_pgp.workflows.key_change import KeyChangeWorkflow def _get_email(msg): @@ -130,7 +131,30 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results): # New public key in attachment, requires to be signed with current # key - pass + wrapped = PGPWrapper(msg) + if not wrapped.has_keys(): + print('No keys attached? Send a key.', file=results) + return ContinueProcessing.no + + keys = list(wrapped.keys()) + if len(keys) != 1: + print('More than one key! Send only one key.', file=results) + return ContinueProcessing.no + + email = _get_email(msg) + if not email: + print('No email to change key of.', file=results) + return ContinueProcessing.no + + pgp_address = PGPAddress.for_email(email) + if pgp_address is None: + print('A pgp enabled address not found.', file=results) + return ContinueProcessing.no + + workflow = KeyChangeWorkflow(mlist, pgp_address, keys.pop()) + list(workflow) + print('Key change request received.', file=results) + return ContinueProcessing.no def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results): diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index 48ca3c9..fe75a6a 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -365,3 +365,46 @@ class TestPreSubscription(unittest.TestCase): self.assertIn('Message not signed, ignoring.', results_msg.get_payload()) + + +class TestAfterSubscription(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + self.mlist = create_list('test@example.com', style_name='pgp-default') + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = load_key('ecc_p256.priv.asc') + + def test_key_change(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart_key = load_key('rsa_1024.priv.asc') + bart_new_key = load_key('ecc_p256.priv.asc') + + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key change') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_key(bart_new_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=2) + if items[0].msg['Subject'] == 'The results of your email commands': + results = items[0].msg + confirm_request = items[1].msg + else: + results = items[1].msg + confirm_request = items[0].msg + + self.assertIn('Key change request received.', results.get_payload()) + + confirm_wrapped = PGPWrapper(confirm_request) + self.assertTrue(confirm_wrapped.is_encrypted()) diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index 20db5f9..a8679bd 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -18,7 +18,6 @@ """""" from mailman.email.message import UserNotification from mailman.interfaces.subscriptions import TokenOwner -from mailman.workflows.common import WhichSubscriber from pgpy import PGPKey from mailman_pgp.model.address import PGPAddress diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py new file mode 100644 index 0000000..0098b19 --- /dev/null +++ b/src/mailman_pgp/workflows/key_change.py @@ -0,0 +1,117 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . + +"""""" +from mailman.email.message import UserNotification +from mailman.interfaces.pending import IPendable, IPendings +from mailman.interfaces.workflows import IWorkflow +from mailman.workflows.base import Workflow +from pgpy import PGPKey +from public import public +from zope.component import getUtility +from zope.interface import implementer + +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.pgp.utils import copy_headers +from mailman_pgp.pgp.wrapper import PGPWrapper + +CHANGE_CONFIRM_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +You requested to change your key. +Reply to this message with this whole text +signed with your supplied key, either inline or PGP/MIME. + +Fingerprint: {} +Token: {} +---------- +""" + + +@public +@implementer(IWorkflow) +class KeyChangeWorkflow(Workflow): + name = 'pgp-key-change-workflow' + description = '' + initial_state = 'send_key_confirm_request' + save_attributes = ( + 'address_key', + 'pubkey_key' + ) + + def __init__(self, mlist, pgp_address=None, pubkey=None): + super().__init__() + self.mlist = mlist + self.pgp_address = pgp_address + self.pubkey = pubkey + + @property + def address_key(self): + return self.pgp_address.email + + @address_key.setter + def address_key(self, value): + self.pgp_address = PGPAddress.for_email(value) + + @property + def pubkey_key(self): + if self.pubkey is None: + return None + return str(self.pubkey) + + @pubkey_key.setter + def pubkey_key(self, value): + if value is not None: + self.pubkey, _ = PGPKey.from_blob(value) + else: + self.pubkey = None + + def _step_send_key_confirm_request(self): + pendings = getUtility(IPendings) + pendable = KeyChangeWorkflow.pendable_class()( + email=self.pgp_address.email, + pubkey=str(self.pubkey) + ) + self.token = pendings.add(pendable) + + self.push('receive_key_confirmation') + self.save() + request_address = self.mlist.request_address + email_address = self.pgp_address.email + msg = UserNotification(email_address, request_address, + 'key confirm {}'.format(self.token), + CHANGE_CONFIRM_REQUEST.format( + self.pubkey.fingerprint, + self.token)) + wrapped = PGPWrapper(msg) + encrypted = wrapped.encrypt(self.pubkey) + + msg.set_payload(encrypted.get_payload()) + copy_headers(encrypted, msg, True) + msg.send(self.mlist) + raise StopIteration + + def _step_receive_confirmation(self): + pass + + @classmethod + def pendable_class(cls): + @implementer(IPendable) + class Pendable(dict): + PEND_TYPE = KeyChangeWorkflow.name + + return Pendable -- cgit v1.2.3-70-g09d2 From 03314a50895615623f5f0c80d77076657348ad05 Mon Sep 17 00:00:00 2001 From: J08nY Date: Thu, 13 Jul 2017 23:51:42 +0200 Subject: Fix TypeError in MIMEWrapper on empty protocol param. --- src/mailman_pgp/pgp/mime.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py index 975b8bd..a7e31b8 100644 --- a/src/mailman_pgp/pgp/mime.py +++ b/src/mailman_pgp/pgp/mime.py @@ -76,7 +76,8 @@ class MIMEWrapper: if not self._is_mime(): return False second_type = self.msg.get_payload(1).get_content_type() - protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol')) + protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol', + '')) content_subtype = self.msg.get_content_subtype() return (second_type == MIMEWrapper._signed_type and @@ -102,7 +103,8 @@ class MIMEWrapper: first_type = self.msg.get_payload(0).get_content_type() second_type = self.msg.get_payload(1).get_content_type() content_subtype = self.msg.get_content_subtype() - protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol')) + protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol', + '')) return ('Version: 1' in first_part and first_type == MIMEWrapper._encrypted_type and -- cgit v1.2.3-70-g09d2 From f921aeeb2156f81ac2dd1a3db9dd4380b0dcb6da Mon Sep 17 00:00:00 2001 From: J08nY Date: Thu, 13 Jul 2017 23:52:07 +0200 Subject: Fix setting of key_confirmed in PGPAddress reconstructor. --- src/mailman_pgp/model/address.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py index f102bf7..25a87dc 100644 --- a/src/mailman_pgp/model/address.py +++ b/src/mailman_pgp/model/address.py @@ -36,9 +36,9 @@ class PGPAddress(Base): __tablename__ = 'pgp_addresses' id = Column(Integer, primary_key=True) - email = Column(SAUnicode, index=True) + email = Column(SAUnicode, index=True, unique=True) key_fingerprint = Column(SAUnicode) - key_confirmed = Column(Boolean) + key_confirmed = Column(Boolean, default=False) def __init__(self, address): super().__init__() @@ -50,7 +50,6 @@ class PGPAddress(Base): def _init(self): self._address = None self._key = None - self.key_confirmed = False @property def key(self): @@ -120,7 +119,7 @@ class PGPAddress(Base): """ if address is None: return None - return PGPAddress.query().filter_by(email=address.email).first() + return PGPAddress.for_email(address.email) @staticmethod def for_email(email): -- cgit v1.2.3-70-g09d2 From 08389caf276e1b866cae2f6afc1d47b9c1876af5 Mon Sep 17 00:00:00 2001 From: J08nY Date: Thu, 13 Jul 2017 23:52:32 +0200 Subject: Rollback db when resetting for tests. --- src/mailman_pgp/testing/layers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mailman_pgp/testing/layers.py b/src/mailman_pgp/testing/layers.py index 5e0d8b3..aabf216 100644 --- a/src/mailman_pgp/testing/layers.py +++ b/src/mailman_pgp/testing/layers.py @@ -30,6 +30,7 @@ def reset_pgp_world(): full_path = os.path.join(keydir, path) if isfile(full_path): os.remove(full_path) + config.db.session.rollback() with transaction(): Base.metadata.drop_all(config.db.engine) Base.metadata.create_all(config.db.engine) -- cgit v1.2.3-70-g09d2 From 57f8d97c696913beeba8467aa550804422336d9c Mon Sep 17 00:00:00 2001 From: J08nY Date: Thu, 13 Jul 2017 23:57:18 +0200 Subject: Split PGPMixin from subscription mixins. And bunch of other things. --- src/mailman_pgp/commands/eml_key.py | 46 ++++++++++------ src/mailman_pgp/commands/tests/test_key.py | 49 +++++++++++------ src/mailman_pgp/workflows/base.py | 48 ++++++++++------- src/mailman_pgp/workflows/key_change.py | 12 +++-- src/mailman_pgp/workflows/subscription.py | 21 ++++++-- src/mailman_pgp/workflows/tests/test_base.py | 81 +++++++++++++--------------- 6 files changed, 153 insertions(+), 104 deletions(-) diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index 9df6065..7b7782d 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -19,6 +19,7 @@ from email.utils import parseaddr from mailman.interfaces.command import ContinueProcessing, IEmailCommand +from mailman.interfaces.pending import IPendings from mailman.interfaces.subscriptions import ISubscriptionManager from mailman.interfaces.usermanager import IUserManager from public import public @@ -47,6 +48,10 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results): return ContinueProcessing.no wrapped = PGPWrapper(msg) + if wrapped.is_encrypted(): + decrypted = wrapped.decrypt(pgp_list.key) + wrapped = PGPWrapper(decrypted) + if not wrapped.has_keys(): print('No keys attached? Send a key.', file=results) return ContinueProcessing.no @@ -66,21 +71,24 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results): print('No adddress to subscribe with.', file=results) return ContinueProcessing.no - with transaction() as t: - pgp_address = PGPAddress.for_address(address) - if pgp_address is None: - pgp_address = PGPAddress(address) - pgp_address.key = keys.pop() - t.add(pgp_address) + pgp_address = PGPAddress.for_address(address) + if pgp_address is None: + print('A pgp enabled address not found.', file=results) + return ContinueProcessing.no token = arguments[1] - try: - ISubscriptionManager(mlist).confirm(token) - print('Key succesfully set.', file=results) - print('Key fingerprint: {}'.format(pgp_address.key.fingerprint), - file=results) - except LookupError: + pendable = getUtility(IPendings).confirm(token, expunge=False) + if pendable is None: print('Wrong token.', file=results) + return ContinueProcessing.no + + with transaction(): + pgp_address.key = keys.pop() + ISubscriptionManager(mlist).confirm(token) + + print('Key succesfully set.', file=results) + print('Key fingerprint: {}'.format(pgp_address.key.fingerprint), + file=results) return ContinueProcessing.no @@ -115,13 +123,17 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): token = arguments[1] - expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint, - token) + pendable = getUtility(IPendings).confirm(token, expunge=False) + if pendable is None: + print('Wrong token.', file=results) + return ContinueProcessing.no + + # TODO differentiate between key change and subscription here. + + expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint, token) for sig_subject in wrapped.get_signed(): if expecting in sig_subject: - with transaction(): - pgp_address.key_confirmed = True - ISubscriptionManager(mlist).confirm(token) + ISubscriptionManager(mlist).confirm(token) break else: print("Message doesn't contain the expected statement.", file=results) diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index fe75a6a..64f8ae6 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -134,7 +134,6 @@ class TestPreSubscription(unittest.TestCase): pgp_address = PGPAddress.for_address(bart) self.assertIsNotNone(pgp_address) self.assertEqual(pgp_address.key.fingerprint, bart_key.fingerprint) - self.assertEqual(pgp_address.key_fingerprint, bart_key.fingerprint) self.assertFalse(pgp_address.key_confirmed) items = get_queue_messages('virgin', expected_count=2) @@ -242,11 +241,6 @@ class TestPreSubscription(unittest.TestCase): get_queue_messages('virgin') - with transaction() as t: - pgp_address = PGPAddress(bart) - pgp_address.key = bart_key.pubkey - t.add(pgp_address) - message = _create_plain('bart@example.com', 'test@example.com', 'Re: key confirm {}'.format(token), CONFIRM_REQUEST.format(bart_key.fingerprint, @@ -275,11 +269,6 @@ class TestPreSubscription(unittest.TestCase): get_queue_messages('virgin') - with transaction() as t: - pgp_address = PGPAddress(bart) - pgp_address.key = bart_key.pubkey - t.add(pgp_address) - message = _create_plain('bart@example.com', 'test@example.com', 'Re: key confirm {}'.format(token), CONFIRM_REQUEST.format(bart_key.fingerprint, @@ -290,6 +279,7 @@ class TestPreSubscription(unittest.TestCase): mm_config.switchboards['command'].enqueue(message, listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() pgp_address = PGPAddress.for_address(bart) @@ -346,11 +336,6 @@ class TestPreSubscription(unittest.TestCase): get_queue_messages('virgin') - with transaction() as t: - pgp_address = PGPAddress(bart) - pgp_address.key = bart_key.pubkey - t.add(pgp_address) - message = _create_plain('bart@example.com', 'test@example.com', 'Re: key confirm {}'.format(token), CONFIRM_REQUEST.format(bart_key.fingerprint, @@ -408,3 +393,35 @@ class TestAfterSubscription(unittest.TestCase): confirm_wrapped = PGPWrapper(confirm_request) self.assertTrue(confirm_wrapped.is_encrypted()) + decrypted = confirm_wrapped.decrypt(bart_new_key) + self.assertIn('key confirm', decrypted['subject']) + + def test_key_change_confirm(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart_key = load_key('rsa_1024.priv.asc') + bart_new_key = load_key('ecc_p256.priv.asc') + + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key change') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_key(bart_new_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=2) + if items[0].msg['Subject'] == 'The results of your email commands': + confirm_request = items[1].msg + else: + confirm_request = items[0].msg + request_wrapped = PGPWrapper(confirm_request) + request_wrapped.decrypt(bart_new_key) + # TODO finish this diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index a8679bd..d05781d 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -20,6 +20,7 @@ from mailman.email.message import UserNotification from mailman.interfaces.subscriptions import TokenOwner from pgpy import PGPKey +from mailman_pgp.database import transaction from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.utils import copy_headers @@ -44,6 +45,15 @@ Token: {} """ +class PGPMixin: + def _step_pgp_prepare(self): + pgp_address = PGPAddress.for_address(self.address) + if pgp_address is None: + with transaction() as t: + pgp_address = PGPAddress(self.address) + t.add(pgp_address) + + class SetPubkeyMixin: def __init__(self, pubkey=None, pre_confirmed=False): self.pubkey = pubkey @@ -64,13 +74,14 @@ class SetPubkeyMixin: def _step_pubkey_checks(self): pgp_address = PGPAddress.for_address(self.address) + assert pgp_address is not None - if pgp_address is not None: - if not pgp_address.key: + if self.pubkey is None: + if pgp_address.key is None: self.push('send_key_request') else: - if not self.pubkey: - self.push('send_key_request') + with transaction(): + pgp_address.key = self.pubkey def _step_send_key_request(self): self._set_token(TokenOwner.subscriber) @@ -89,14 +100,6 @@ class SetPubkeyMixin: self._restore_subscriber() self._set_token(TokenOwner.no_one) - pgp_address = PGPAddress.for_address(self.address) - if pgp_address is None or pgp_address.key is None: - # The workflow was confirmed but we still dont have an address - # or the pubkey. So resend request and wait. - self.push('send_key_request') - else: - self.pubkey = pgp_address.key - class ConfirmPubkeyMixin: def __init__(self, pre_confirmed=False): @@ -104,27 +107,31 @@ class ConfirmPubkeyMixin: def _step_pubkey_confirmation(self): pgp_address = PGPAddress.for_address(self.address) + assert pgp_address is not None - if pgp_address is not None: - if not pgp_address.key_confirmed and not self.pubkey_confirmed: - self.push('send_key_confirm_request') + if self.pubkey_confirmed: + with transaction(): + pgp_address.key_confirmed = True else: - if not self.pubkey_confirmed: + if not pgp_address.key_confirmed: self.push('send_key_confirm_request') def _step_send_key_confirm_request(self): self._set_token(TokenOwner.subscriber) self.push('receive_key_confirmation') self.save() + + pgp_address = PGPAddress.for_address(self.address) request_address = self.mlist.request_address email_address = self.address.email msg = UserNotification(email_address, request_address, 'key confirm {}'.format(self.token), - CONFIRM_REQUEST.format(self.pubkey.fingerprint, - self.token)) + CONFIRM_REQUEST.format( + pgp_address.key_fingerprint, + self.token)) pgp_list = PGPMailingList.for_list(self.mlist) wrapped = PGPWrapper(msg) - encrypted = wrapped.sign_encrypt(pgp_list.key, self.pubkey, + encrypted = wrapped.sign_encrypt(pgp_list.key, pgp_address.key, pgp_list.pubkey) msg.set_payload(encrypted.get_payload()) @@ -135,3 +142,6 @@ class ConfirmPubkeyMixin: def _step_receive_key_confirmation(self): self._restore_subscriber() self._set_token(TokenOwner.no_one) + with transaction(): + pgp_address = PGPAddress.for_address(self.address) + pgp_address.key_confirmed = True diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py index 0098b19..8831d28 100644 --- a/src/mailman_pgp/workflows/key_change.py +++ b/src/mailman_pgp/workflows/key_change.py @@ -95,8 +95,8 @@ class KeyChangeWorkflow(Workflow): msg = UserNotification(email_address, request_address, 'key confirm {}'.format(self.token), CHANGE_CONFIRM_REQUEST.format( - self.pubkey.fingerprint, - self.token)) + self.pubkey.fingerprint, + self.token)) wrapped = PGPWrapper(msg) encrypted = wrapped.encrypt(self.pubkey) @@ -106,7 +106,13 @@ class KeyChangeWorkflow(Workflow): raise StopIteration def _step_receive_confirmation(self): - pass + self.pgp_address.key = self.pubkey + self.pgp_address.key_confirmed = True + + pendings = getUtility(IPendings) + if self.token is not None: + pendings.confirm(self.token) + self.token = None @classmethod def pendable_class(cls): diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py index c4138e6..ce02013 100644 --- a/src/mailman_pgp/workflows/subscription.py +++ b/src/mailman_pgp/workflows/subscription.py @@ -24,13 +24,15 @@ from mailman.workflows.common import (ConfirmationMixin, ModerationMixin, from public import public from zope.interface import implementer -from mailman_pgp.workflows.base import ConfirmPubkeyMixin, SetPubkeyMixin +from mailman_pgp.workflows.base import (ConfirmPubkeyMixin, PGPMixin, + SetPubkeyMixin) @public @implementer(ISubscriptionWorkflow) class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, - SetPubkeyMixin, ConfirmPubkeyMixin): + SetPubkeyMixin, ConfirmPubkeyMixin, + PGPMixin): """""" name = 'pgp-policy-open' @@ -54,11 +56,13 @@ class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, VerificationMixin.__init__(self, pre_verified=pre_verified) SetPubkeyMixin.__init__(self, pubkey=pubkey) ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) + PGPMixin.__init__(self) def _step_prepare(self): self.push('do_subscription') self.push('pubkey_confirmation') self.push('pubkey_checks') + self.push('pgp_prepare') self.push('verification_checks') self.push('sanity_checks') @@ -67,7 +71,7 @@ class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, @implementer(ISubscriptionWorkflow) class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, ConfirmationMixin, SetPubkeyMixin, - ConfirmPubkeyMixin): + ConfirmPubkeyMixin, PGPMixin): """""" name = 'pgp-policy-confirm' @@ -93,11 +97,13 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed) SetPubkeyMixin.__init__(self, pubkey=pubkey) ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) + PGPMixin.__init__(self) def _step_prepare(self): self.push('do_subscription') self.push('pubkey_confirmation') self.push('pubkey_checks') + self.push('pgp_prepare') self.push('confirmation_checks') self.push('verification_checks') self.push('sanity_checks') @@ -107,7 +113,7 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, @implementer(ISubscriptionWorkflow) class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, ModerationMixin, SetPubkeyMixin, - ConfirmPubkeyMixin): + ConfirmPubkeyMixin, PGPMixin): """""" name = 'pgp-policy-moderate' @@ -133,12 +139,14 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, ModerationMixin.__init__(self, pre_approved=pre_approved) SetPubkeyMixin.__init__(self, pubkey=pubkey) ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) + PGPMixin.__init__(self) def _step_prepare(self): self.push('do_subscription') self.push('moderation_checks') self.push('pubkey_confirmation') self.push('pubkey_checks') + self.push('pgp_prepare') self.push('verification_checks') self.push('sanity_checks') @@ -147,7 +155,8 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, @implementer(ISubscriptionWorkflow) class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, ConfirmationMixin, ModerationMixin, - SetPubkeyMixin, ConfirmPubkeyMixin): + SetPubkeyMixin, ConfirmPubkeyMixin, + PGPMixin): """""" name = 'pgp-policy-confirm-moderate' @@ -175,12 +184,14 @@ class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, ModerationMixin.__init__(self, pre_approved=pre_approved) SetPubkeyMixin.__init__(self, pubkey=pubkey) ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) + PGPMixin.__init__(self) def _step_prepare(self): self.push('do_subscription') self.push('moderation_checks') self.push('pubkey_confirmation') self.push('pubkey_checks') + self.push('pgp_prepare') self.push('confirmation_checks') self.push('verification_checks') self.push('sanity_checks') diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py index 913ab67..af00d01 100644 --- a/src/mailman_pgp/workflows/tests/test_base.py +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -38,7 +38,6 @@ from mailman_pgp.workflows.subscription import ConfirmSubscriptionPolicy class PubkeyMixinSetup(): - def setUp(self): with mm_transaction(): self.mlist = create_list('test@example.com', @@ -55,6 +54,27 @@ class PubkeyMixinSetup(): self.sender = self.um.create_address('rsa-1024b@example.org') +class TestPGPMixin(PubkeyMixinSetup, unittest.TestCase): + layer = PGPConfigLayer + + def test_create_address(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True) + workflow.run_thru('pgp_prepare') + pgp_address = PGPAddress.for_address(self.sender) + self.assertIsNotNone(pgp_address) + + def test_address_existing(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True) + with transaction() as t: + pgp_address = PGPAddress(self.sender) + t.add(pgp_address) + workflow.run_thru('pgp_prepare') + + class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): layer = PGPConfigLayer @@ -75,60 +95,38 @@ class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): pre_verified=True, pre_confirmed=True) list(workflow) - with transaction() as t: - pgp_address = PGPAddress(self.sender) + with transaction(): + pgp_address = PGPAddress.for_address(self.sender) pgp_address.key = self.sender_key.pubkey - t.add(pgp_address) receive_workflow = ConfirmSubscriptionPolicy(self.mlist) receive_workflow.token = workflow.token receive_workflow.restore() receive_workflow.run_thru('receive_key') - self.assertIsNotNone(receive_workflow.pubkey) - self.assertEqual(receive_workflow.pubkey.fingerprint, - self.sender_key.pubkey.fingerprint) - - receive_workflow.run_thru('pubkey_confirmation') - with patch.object(receive_workflow, - '_step_send_key_confirm_request') as step: - next(receive_workflow) - step.assert_called_once_with() - - def test_receive_key_no_address(self): + def test_set_pubkey(self): workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, pre_verified=True, - pre_confirmed=True) - list(workflow) - receive_workflow = ConfirmSubscriptionPolicy(self.mlist) - receive_workflow.token = workflow.token - receive_workflow.restore() - receive_workflow.run_thru('receive_key') - - self.assertIsNone(receive_workflow.pubkey) - with patch.object(receive_workflow, - '_step_send_key_request') as step: - next(receive_workflow) - step.assert_called_once_with() + pre_confirmed=True, + pubkey=self.sender_key.pubkey) + workflow.run_thru('pubkey_checks') + pgp_address = PGPAddress.for_address(self.sender) + self.assertIsNotNone(pgp_address) + self.assertIsNotNone(pgp_address.key) + self.assertEqual(pgp_address.key_fingerprint, + self.sender_key.fingerprint) - def test_receive_key_pubkey_confirmed(self): + def test_pubkey_set(self): workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, pre_verified=True, - pre_confirmed=True, - pubkey_pre_confirmed=True) - list(workflow) + pre_confirmed=True) with transaction() as t: pgp_address = PGPAddress(self.sender) pgp_address.key = self.sender_key.pubkey t.add(pgp_address) - - receive_workflow = ConfirmSubscriptionPolicy(self.mlist) - receive_workflow.token = workflow.token - receive_workflow.restore() - receive_workflow.run_thru('pubkey_confirmation') - with patch.object(receive_workflow, '_step_do_subscription') as step: - next(receive_workflow) - step.assert_called_once_with() + workflow.run_thru('pubkey_checks') + self.assertEqual(pgp_address.key_fingerprint, + self.sender_key.fingerprint) class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): @@ -168,11 +166,6 @@ class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): pubkey_pre_confirmed=False) list(workflow) - with transaction() as t: - pgp_address = PGPAddress(self.sender) - pgp_address.key = self.sender_key.pubkey - t.add(pgp_address) - receive_workflow = ConfirmSubscriptionPolicy(self.mlist) receive_workflow.token = workflow.token receive_workflow.restore() -- cgit v1.2.3-70-g09d2 From 8368cd832d21b404c01ab475ade6209b906ab422 Mon Sep 17 00:00:00 2001 From: J08nY Date: Fri, 14 Jul 2017 00:37:40 +0200 Subject: Make key change work. --- src/mailman_pgp/commands/eml_key.py | 15 ++++++++++++--- src/mailman_pgp/commands/tests/test_key.py | 23 +++++++++++++++++++++-- src/mailman_pgp/workflows/key_change.py | 11 +++++++---- 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index 7b7782d..2f7a7e7 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -31,7 +31,8 @@ from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.workflows.base import CONFIRM_REQUEST -from mailman_pgp.workflows.key_change import KeyChangeWorkflow +from mailman_pgp.workflows.key_change import (CHANGE_CONFIRM_REQUEST, + KeyChangeWorkflow) def _get_email(msg): @@ -128,9 +129,12 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): print('Wrong token.', file=results) return ContinueProcessing.no - # TODO differentiate between key change and subscription here. + if pendable.get('type') == KeyChangeWorkflow.pendable_class().PEND_TYPE: + expecting = CHANGE_CONFIRM_REQUEST.format(pendable.get('fingerprint'), + token) + else: + expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint, token) - expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint, token) for sig_subject in wrapped.get_signed(): if expecting in sig_subject: ISubscriptionManager(mlist).confirm(token) @@ -143,6 +147,11 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results): # New public key in attachment, requires to be signed with current # key + if len(arguments) != 1: + print('Extraneous argument/s: ' + ','.join(arguments[1:]), + file=results) + return ContinueProcessing.no + wrapped = PGPWrapper(msg) if not wrapped.has_keys(): print('No keys attached? Send a key.', file=results) diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index 64f8ae6..4f62b11 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -37,6 +37,7 @@ from mailman_pgp.pgp.tests.base import load_key from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.testing.layers import PGPConfigLayer from mailman_pgp.workflows.base import CONFIRM_REQUEST +from mailman_pgp.workflows.key_change import CHANGE_CONFIRM_REQUEST from mailman_pgp.workflows.subscription import OpenSubscriptionPolicy @@ -423,5 +424,23 @@ class TestAfterSubscription(unittest.TestCase): else: confirm_request = items[0].msg request_wrapped = PGPWrapper(confirm_request) - request_wrapped.decrypt(bart_new_key) - # TODO finish this + decrypted = request_wrapped.decrypt(bart_new_key) + + subj = decrypted['subject'] + token = subj.split(' ')[-1] + + confirm_message = _create_plain('bart@example.com', 'test@example.com', + decrypted['subject'], + CHANGE_CONFIRM_REQUEST.format( + bart_new_key.fingerprint, + token)) + wrapped_confirm = MIMEWrapper(confirm_message) + confirm = wrapped_confirm.sign(bart_key) + + mm_config.switchboards['command'].enqueue(confirm, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + pgp_address = PGPAddress.for_address(bart) + self.assertEqual(pgp_address.key_fingerprint, bart_new_key.fingerprint) + self.assertTrue(pgp_address.key_confirmed) diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py index 8831d28..a67edbb 100644 --- a/src/mailman_pgp/workflows/key_change.py +++ b/src/mailman_pgp/workflows/key_change.py @@ -25,6 +25,7 @@ from public import public from zope.component import getUtility from zope.interface import implementer +from mailman_pgp.database import transaction from mailman_pgp.model.address import PGPAddress from mailman_pgp.pgp.utils import copy_headers from mailman_pgp.pgp.wrapper import PGPWrapper @@ -84,11 +85,12 @@ class KeyChangeWorkflow(Workflow): pendings = getUtility(IPendings) pendable = KeyChangeWorkflow.pendable_class()( email=self.pgp_address.email, - pubkey=str(self.pubkey) + pubkey=str(self.pubkey), + fingerprint=self.pubkey.fingerprint ) self.token = pendings.add(pendable) - self.push('receive_key_confirmation') + self.push('receive_confirmation') self.save() request_address = self.mlist.request_address email_address = self.pgp_address.email @@ -106,8 +108,9 @@ class KeyChangeWorkflow(Workflow): raise StopIteration def _step_receive_confirmation(self): - self.pgp_address.key = self.pubkey - self.pgp_address.key_confirmed = True + with transaction(): + self.pgp_address.key = self.pubkey + self.pgp_address.key_confirmed = True pendings = getUtility(IPendings) if self.token is not None: -- cgit v1.2.3-70-g09d2 From d8afe4bec9282254483ea1c7571298dcd9731508 Mon Sep 17 00:00:00 2001 From: J08nY Date: Fri, 14 Jul 2017 01:44:54 +0200 Subject: Workflow tests refactor. --- src/mailman_pgp/commands/tests/test_key.py | 65 +++++------ src/mailman_pgp/workflows/base.py | 3 +- src/mailman_pgp/workflows/tests/test_base.py | 130 +++++++++++---------- .../workflows/tests/test_subscription.py | 2 + 4 files changed, 105 insertions(+), 95 deletions(-) diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index 4f62b11..44d5b25 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -26,6 +26,7 @@ from mailman.interfaces.usermanager import IUserManager from mailman.runners.command import CommandRunner from mailman.testing.helpers import get_queue_messages, make_testable_runner from mailman.utilities.datetime import now +from public import public from zope.component import getUtility from mailman_pgp.config import mm_config @@ -59,6 +60,7 @@ def _create_mixed(from_hdr, to_hdr, subject_hdr): return message +@public class TestPreDispatch(unittest.TestCase): layer = PGPConfigLayer @@ -103,6 +105,7 @@ class TestPreDispatch(unittest.TestCase): results_msg.get_payload()) +@public class TestPreSubscription(unittest.TestCase): layer = PGPConfigLayer @@ -111,6 +114,9 @@ class TestPreSubscription(unittest.TestCase): self.pgp_list = PGPMailingList.for_list(self.mlist) self.pgp_list.key = load_key('ecc_p256.priv.asc') + self.bart_key = load_key('rsa_1024.priv.asc') + self.anne_key = load_key('ecc_p256.priv.asc') + def test_set(self): self.mlist.subscription_policy = OpenSubscriptionPolicy bart = getUtility(IUserManager).create_address('bart@example.com', @@ -121,12 +127,10 @@ class TestPreSubscription(unittest.TestCase): get_queue_messages('virgin') - bart_key = load_key('rsa_1024.priv.asc') - set_message = _create_mixed('bart@example.com', 'test@example.com', 'Re: key set {}'.format(token)) wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(bart_key.pubkey) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -134,7 +138,8 @@ class TestPreSubscription(unittest.TestCase): pgp_address = PGPAddress.for_address(bart) self.assertIsNotNone(pgp_address) - self.assertEqual(pgp_address.key.fingerprint, bart_key.fingerprint) + self.assertEqual(pgp_address.key.fingerprint, + self.bart_key.fingerprint) self.assertFalse(pgp_address.key_confirmed) items = get_queue_messages('virgin', expected_count=2) @@ -146,7 +151,7 @@ class TestPreSubscription(unittest.TestCase): confirm_request = items[0].msg self.assertIn('Key succesfully set.', results.get_payload()) - self.assertIn('Key fingerprint: {}'.format(bart_key.fingerprint), + self.assertIn('Key fingerprint: {}'.format(self.bart_key.fingerprint), results.get_payload()) confirm_wrapped = PGPWrapper(confirm_request) @@ -178,15 +183,12 @@ class TestPreSubscription(unittest.TestCase): results_msg.get_payload()) def test_set_multiple_keys(self): - bart_key = load_key('rsa_1024.priv.asc') - anne_key = load_key('ecc_p256.priv.asc') - set_message = _create_mixed('bart@example.com', 'test@example.com', 'Re: key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(bart_key.pubkey) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(anne_key.pubkey) + set_message = wrapped_set_message.attach_key(self.anne_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -198,11 +200,9 @@ class TestPreSubscription(unittest.TestCase): results_msg.get_payload()) def test_set_no_email(self): - bart_key = load_key('rsa_1024.priv.asc') - message = _create_mixed('', 'test@example.com', 'key set token') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(bart_key.pubkey) + message = wrapped_message.attach_key(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -213,12 +213,10 @@ class TestPreSubscription(unittest.TestCase): self.assertIn('No email to subscribe with.', results_msg.get_payload()) def test_set_no_address(self): - bart_key = load_key('rsa_1024.priv.asc') - set_message = _create_mixed('bart@example.com', 'test@example.com', 'key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(bart_key.pubkey) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -235,19 +233,18 @@ class TestPreSubscription(unittest.TestCase): 'Bart Person') bart.verified_on = now() - bart_key = load_key('rsa_1024.priv.asc') - token, token_owner, member = ISubscriptionManager(self.mlist).register( - bart, pubkey=bart_key.pubkey) + bart, pubkey=self.bart_key.pubkey) get_queue_messages('virgin') message = _create_plain('bart@example.com', 'test@example.com', 'Re: key confirm {}'.format(token), - CONFIRM_REQUEST.format(bart_key.fingerprint, - token)) + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + token)) wrapped_message = MIMEWrapper(message) - message = wrapped_message.sign(bart_key) + message = wrapped_message.sign(self.bart_key) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -263,20 +260,20 @@ class TestPreSubscription(unittest.TestCase): 'Bart Person') bart.verified_on = now() - bart_key = load_key('rsa_1024.priv.asc') - token, token_owner, member = ISubscriptionManager(self.mlist).register( - bart, pubkey=bart_key.pubkey) + bart, pubkey=self.bart_key.pubkey) get_queue_messages('virgin') message = _create_plain('bart@example.com', 'test@example.com', 'Re: key confirm {}'.format(token), - CONFIRM_REQUEST.format(bart_key.fingerprint, - token)) + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + token)) wrapped_message = MIMEWrapper(message) - message = wrapped_message.sign_encrypt(bart_key, self.pgp_list.pubkey, - bart_key.pubkey) + message = wrapped_message.sign_encrypt(self.bart_key, + self.pgp_list.pubkey, + self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -330,17 +327,16 @@ class TestPreSubscription(unittest.TestCase): 'Bart Person') bart.verified_on = now() - bart_key = load_key('rsa_1024.priv.asc') - token, token_owner, member = ISubscriptionManager(self.mlist).register( - bart, pubkey=bart_key.pubkey) + bart, pubkey=self.bart_key.pubkey) get_queue_messages('virgin') message = _create_plain('bart@example.com', 'test@example.com', 'Re: key confirm {}'.format(token), - CONFIRM_REQUEST.format(bart_key.fingerprint, - token)) + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + token)) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -353,6 +349,7 @@ class TestPreSubscription(unittest.TestCase): results_msg.get_payload()) +@public class TestAfterSubscription(unittest.TestCase): layer = PGPConfigLayer diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index d05781d..014f2dd 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -55,9 +55,8 @@ class PGPMixin: class SetPubkeyMixin: - def __init__(self, pubkey=None, pre_confirmed=False): + def __init__(self, pubkey=None): self.pubkey = pubkey - self.pubkey_confirmed = pre_confirmed @property def pubkey_key(self): diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py index af00d01..c59cfe0 100644 --- a/src/mailman_pgp/workflows/tests/test_base.py +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -24,8 +24,12 @@ from unittest.mock import patch from mailman.app.lifecycle import create_list from mailman.interfaces.pending import IPendings from mailman.interfaces.usermanager import IUserManager +from mailman.interfaces.workflows import IWorkflow from mailman.testing.helpers import get_queue_messages +from mailman.workflows.common import SubscriptionBase +from public import public from zope.component import getUtility +from zope.interface import implementer from mailman_pgp.database import mm_transaction, transaction from mailman_pgp.model.address import PGPAddress @@ -33,11 +37,11 @@ from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.tests.base import load_key from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.testing.layers import PGPConfigLayer -from mailman_pgp.workflows.base import KEY_REQUEST -from mailman_pgp.workflows.subscription import ConfirmSubscriptionPolicy +from mailman_pgp.workflows.base import (KEY_REQUEST, PGPMixin, SetPubkeyMixin, + ConfirmPubkeyMixin) -class PubkeyMixinSetup(): +class PubkeyMixinTestSetup(): def setUp(self): with mm_transaction(): self.mlist = create_list('test@example.com', @@ -54,34 +58,62 @@ class PubkeyMixinSetup(): self.sender = self.um.create_address('rsa-1024b@example.org') -class TestPGPMixin(PubkeyMixinSetup, unittest.TestCase): +@implementer(IWorkflow) +class PGPTestWorkflow(SubscriptionBase, PGPMixin, SetPubkeyMixin, + ConfirmPubkeyMixin): + name = 'test-workflow' + description = '' + initial_state = 'prepare' + save_attributes = ( + 'pubkey_key', + 'pubkey_confirmed', + 'address_key', + 'subscriber_key', + 'user_key', + 'token_owner_key' + ) + + def __init__(self, mlist, subscriber=None, *, pubkey=None, + pubkey_pre_confirmed=False): + SubscriptionBase.__init__(self, mlist, subscriber) + SetPubkeyMixin.__init__(self, pubkey=pubkey) + ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) + PGPMixin.__init__(self) + + def _step_prepare(self): + self.push('do_subscription') + self.push('pubkey_confirmation') + self.push('pubkey_checks') + self.push('pgp_prepare') + self.push('sanity_checks') + + +@public +class TestPGPMixin(PubkeyMixinTestSetup, unittest.TestCase): layer = PGPConfigLayer def test_create_address(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) workflow.run_thru('pgp_prepare') pgp_address = PGPAddress.for_address(self.sender) self.assertIsNotNone(pgp_address) def test_address_existing(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) with transaction() as t: pgp_address = PGPAddress(self.sender) t.add(pgp_address) workflow.run_thru('pgp_prepare') + still = PGPAddress.for_address(self.sender) + self.assertIsNotNone(still) -class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): +@public +class TestSetPubkeyMixin(PubkeyMixinTestSetup, unittest.TestCase): layer = PGPConfigLayer def test_key_request_sent(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) list(workflow) items = get_queue_messages('virgin', expected_count=1) message = items[0].msg @@ -91,24 +123,20 @@ class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): self.assertEqual(message.get_payload(), KEY_REQUEST) def test_receive_key(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) list(workflow) with transaction(): pgp_address = PGPAddress.for_address(self.sender) pgp_address.key = self.sender_key.pubkey - receive_workflow = ConfirmSubscriptionPolicy(self.mlist) + receive_workflow = PGPTestWorkflow(self.mlist) receive_workflow.token = workflow.token receive_workflow.restore() receive_workflow.run_thru('receive_key') def test_set_pubkey(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True, - pubkey=self.sender_key.pubkey) + workflow = PGPTestWorkflow(self.mlist, self.sender, + pubkey=self.sender_key.pubkey) workflow.run_thru('pubkey_checks') pgp_address = PGPAddress.for_address(self.sender) self.assertIsNotNone(pgp_address) @@ -117,9 +145,7 @@ class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): self.sender_key.fingerprint) def test_pubkey_set(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) with transaction() as t: pgp_address = PGPAddress(self.sender) pgp_address.key = self.sender_key.pubkey @@ -129,26 +155,23 @@ class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): self.sender_key.fingerprint) -class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): +@public +class TestConfirmPubkeyMixin(PubkeyMixinTestSetup, unittest.TestCase): layer = PGPConfigLayer def test_key_request_pubkey_set(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True, - pubkey=self.sender_key.pubkey, - pubkey_pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=True) workflow.run_thru('pubkey_confirmation') with patch.object(workflow, '_step_do_subscription') as step: next(workflow) step.assert_called_once_with() def test_send_key_confirm_request(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True, - pubkey=self.sender_key.pubkey, - pubkey_pre_confirmed=False) + workflow = PGPTestWorkflow(self.mlist, self.sender, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=False) list(workflow) items = get_queue_messages('virgin', expected_count=1) message = items[0].msg @@ -159,14 +182,12 @@ class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): self.assertTrue(wrapped.is_encrypted()) def test_receive_confirmation(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True, - pubkey=self.sender_key.pubkey, - pubkey_pre_confirmed=False) + workflow = PGPTestWorkflow(self.mlist, self.sender, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=False) list(workflow) - receive_workflow = ConfirmSubscriptionPolicy(self.mlist) + receive_workflow = PGPTestWorkflow(self.mlist) receive_workflow.token = workflow.token receive_workflow.restore() receive_workflow.run_thru('receive_key_confirmation') @@ -175,13 +196,12 @@ class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): step.assert_called_once_with() -class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase): +@public +class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase): layer = PGPConfigLayer def test_pended_data_key_request(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) with suppress(StopIteration): workflow.run_thru('send_key_request') self.assertIsNotNone(workflow.token) @@ -193,10 +213,8 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase): self.assertEqual(pendable['token_owner'], 'subscriber') def test_pended_data_key_confirmation(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True, - pubkey=self.sender_key.pubkey) + workflow = PGPTestWorkflow(self.mlist, self.sender, + pubkey=self.sender_key.pubkey) with suppress(StopIteration): workflow.run_thru('send_key_confirm_request') self.assertIsNotNone(workflow.token) @@ -208,9 +226,7 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase): self.assertEqual(pendable['token_owner'], 'subscriber') def test_exisitng_pgp_address(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) with transaction() as t: pgp_address = PGPAddress(self.sender) @@ -224,9 +240,7 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase): step.assert_called_once_with() def test_exisitng_pgp_address_not_confirmed(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) with transaction() as t: pgp_address = PGPAddress(self.sender) @@ -239,9 +253,7 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase): step.assert_called_once_with() def test_exisitng_pgp_address_no_key(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) with transaction() as t: pgp_address = PGPAddress(self.sender) diff --git a/src/mailman_pgp/workflows/tests/test_subscription.py b/src/mailman_pgp/workflows/tests/test_subscription.py index 9464a83..f9fa1e1 100644 --- a/src/mailman_pgp/workflows/tests/test_subscription.py +++ b/src/mailman_pgp/workflows/tests/test_subscription.py @@ -20,6 +20,7 @@ import unittest from mailman.app.lifecycle import create_list from mailman.interfaces.usermanager import IUserManager +from public import public from zope.component import getUtility from mailman_pgp.database import mm_transaction @@ -29,6 +30,7 @@ from mailman_pgp.workflows.subscription import ( ModerationSubscriptionPolicy, OpenSubscriptionPolicy) +@public class TestSubscriptionWorkflows(unittest.TestCase): layer = PGPConfigLayer -- cgit v1.2.3-70-g09d2 From 615a1e7c01a0710c5ba138d81358d80827bcf680 Mon Sep 17 00:00:00 2001 From: J08nY Date: Fri, 14 Jul 2017 02:01:33 +0200 Subject: Some more Key change workflow tests. --- src/mailman_pgp/workflows/key_change.py | 20 ++-- src/mailman_pgp/workflows/tests/test_key_change.py | 105 +++++++++++++++++++++ 2 files changed, 118 insertions(+), 7 deletions(-) create mode 100644 src/mailman_pgp/workflows/tests/test_key_change.py diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py index a67edbb..9e204cf 100644 --- a/src/mailman_pgp/workflows/key_change.py +++ b/src/mailman_pgp/workflows/key_change.py @@ -18,6 +18,7 @@ """""" from mailman.email.message import UserNotification from mailman.interfaces.pending import IPendable, IPendings +from mailman.interfaces.subscriptions import TokenOwner from mailman.interfaces.workflows import IWorkflow from mailman.workflows.base import Workflow from pgpy import PGPKey @@ -30,6 +31,7 @@ from mailman_pgp.model.address import PGPAddress from mailman_pgp.pgp.utils import copy_headers from mailman_pgp.pgp.wrapper import PGPWrapper + CHANGE_CONFIRM_REQUEST = """\ ---------- TODO: this is a pgp enabled list. @@ -48,7 +50,7 @@ Token: {} class KeyChangeWorkflow(Workflow): name = 'pgp-key-change-workflow' description = '' - initial_state = 'send_key_confirm_request' + initial_state = 'change_key' save_attributes = ( 'address_key', 'pubkey_key' @@ -67,19 +69,21 @@ class KeyChangeWorkflow(Workflow): @address_key.setter def address_key(self, value): self.pgp_address = PGPAddress.for_email(value) + self.member = self.mlist.regular_members.get_member(value) @property def pubkey_key(self): - if self.pubkey is None: - return None return str(self.pubkey) @pubkey_key.setter def pubkey_key(self, value): - if value is not None: - self.pubkey, _ = PGPKey.from_blob(value) - else: - self.pubkey = None + self.pubkey, _ = PGPKey.from_blob(value) + + def _step_change_key(self): + if self.pgp_address is None or self.pubkey is None: + raise ValueError + + self.push('send_key_confirm_request') def _step_send_key_confirm_request(self): pendings = getUtility(IPendings) @@ -89,6 +93,7 @@ class KeyChangeWorkflow(Workflow): fingerprint=self.pubkey.fingerprint ) self.token = pendings.add(pendable) + self.token_owner = TokenOwner.subscriber self.push('receive_confirmation') self.save() @@ -116,6 +121,7 @@ class KeyChangeWorkflow(Workflow): if self.token is not None: pendings.confirm(self.token) self.token = None + self.token_owner = TokenOwner.no_one @classmethod def pendable_class(cls): diff --git a/src/mailman_pgp/workflows/tests/test_key_change.py b/src/mailman_pgp/workflows/tests/test_key_change.py new file mode 100644 index 0000000..5d61efd --- /dev/null +++ b/src/mailman_pgp/workflows/tests/test_key_change.py @@ -0,0 +1,105 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . + +"""""" + +import unittest + +from mailman.app.lifecycle import create_list +from mailman.interfaces.subscriptions import ISubscriptionManager, TokenOwner +from mailman.interfaces.usermanager import IUserManager +from mailman.testing.helpers import get_queue_messages +from public import public +from zope.component import getUtility + +from mailman_pgp.database import mm_transaction, transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.tests.base import load_key +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.workflows.key_change import KeyChangeWorkflow + + +@public +class TestKeyChangeWorkflow(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = load_key('ecc_p256.priv.asc') + + self.sender_key = load_key('rsa_1024.priv.asc') + self.sender_new_key = load_key('ecc_p256.priv.asc') + self.sender = getUtility(IUserManager).create_address( + 'rsa-1024b@example.org') + + def test_pgp_address_none(self): + workflow = KeyChangeWorkflow(self.mlist) + with self.assertRaises(ValueError): + list(workflow) + + def test_pubkey_none(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + t.add(pgp_address) + + workflow = KeyChangeWorkflow(self.mlist, pgp_address) + with self.assertRaises(ValueError): + list(workflow) + + def test_send_key_confirm_request(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow = KeyChangeWorkflow(self.mlist, pgp_address, + self.sender_new_key.pubkey) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + token = workflow.token + + self.assertEqual(message['Subject'], 'key confirm {}'.format(token)) + wrapped = PGPWrapper(message) + self.assertTrue(wrapped.is_encrypted()) + + def test_confirm(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow = KeyChangeWorkflow(self.mlist, pgp_address, + self.sender_new_key.pubkey) + list(workflow) + + token, token_owner, member = ISubscriptionManager(self.mlist).confirm( + workflow.token) + self.assertIsNone(token) + self.assertEqual(token_owner, TokenOwner.no_one) + + pgp_address = PGPAddress.for_address(self.sender) + self.assertEqual(pgp_address.key_fingerprint, + self.sender_new_key.fingerprint) + self.assertTrue(pgp_address.key_confirmed) -- cgit v1.2.3-70-g09d2 From 6b7b6bfe1044a5c7a548a091a49dcdd2422aaff5 Mon Sep 17 00:00:00 2001 From: J08nY Date: Fri, 14 Jul 2017 02:10:31 +0200 Subject: Speed up testing. Dont drop the whole db structure. --- src/mailman_pgp/testing/layers.py | 45 ++++++++++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/src/mailman_pgp/testing/layers.py b/src/mailman_pgp/testing/layers.py index aabf216..fb8a3ec 100644 --- a/src/mailman_pgp/testing/layers.py +++ b/src/mailman_pgp/testing/layers.py @@ -14,6 +14,7 @@ # # You should have received a copy of the GNU General Public License along with # this program. If not, see . +import contextlib import os from os.path import isfile @@ -24,18 +25,36 @@ from mailman_pgp.database import transaction from mailman_pgp.model.base import Base -def reset_pgp_world(): +def reset_rollback(): + config.db.session.rollback() + + +def reset_pgp_dirs(): for keydir in (config.pgp.keydir_config.values()): for path in os.listdir(keydir): full_path = os.path.join(keydir, path) if isfile(full_path): os.remove(full_path) - config.db.session.rollback() + + +def reset_pgp_hard(): + reset_rollback() + reset_pgp_dirs() with transaction(): Base.metadata.drop_all(config.db.engine) Base.metadata.create_all(config.db.engine) +def reset_pgp_soft(): + reset_rollback() + reset_pgp_dirs() + with contextlib.closing(config.db.engine.connect()) as con: + trans = con.begin() + for table in reversed(Base.metadata.sorted_tables): + con.execute(table.delete()) + trans.commit() + + # It's weird that ws have to do this, but for some reason nose2 test layers # don't work when ws create a mixin class with the two classmethods # and subclass both it and the respective Mailman Core test layer. @@ -46,28 +65,38 @@ class PGPConfigLayer(ConfigLayer): @classmethod def tearDown(cls): - reset_pgp_world() + reset_pgp_soft() + + @classmethod + def testTearDown(cls): + reset_pgp_soft() + + +class PGPMigrationLayer(ConfigLayer): + @classmethod + def tearDown(cls): + reset_pgp_hard() @classmethod def testTearDown(cls): - reset_pgp_world() + reset_pgp_hard() class PGPSMTPLayer(SMTPLayer): @classmethod def tearDown(cls): - reset_pgp_world() + reset_pgp_soft() @classmethod def testTearDown(cls): - reset_pgp_world() + reset_pgp_soft() class PGPRESTLayer(RESTLayer): @classmethod def tearDown(cls): - reset_pgp_world() + reset_pgp_soft() @classmethod def testTearDown(cls): - reset_pgp_world() + reset_pgp_soft() -- cgit v1.2.3-70-g09d2 From 0b9335e163791959390bf3c83928e5b61e912fa3 Mon Sep 17 00:00:00 2001 From: J08nY Date: Fri, 14 Jul 2017 02:16:23 +0200 Subject: qa. --- src/mailman_pgp/workflows/key_change.py | 3 +-- src/mailman_pgp/workflows/tests/test_base.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py index 9e204cf..c6d3ebc 100644 --- a/src/mailman_pgp/workflows/key_change.py +++ b/src/mailman_pgp/workflows/key_change.py @@ -31,7 +31,6 @@ from mailman_pgp.model.address import PGPAddress from mailman_pgp.pgp.utils import copy_headers from mailman_pgp.pgp.wrapper import PGPWrapper - CHANGE_CONFIRM_REQUEST = """\ ---------- TODO: this is a pgp enabled list. @@ -80,7 +79,7 @@ class KeyChangeWorkflow(Workflow): self.pubkey, _ = PGPKey.from_blob(value) def _step_change_key(self): - if self.pgp_address is None or self.pubkey is None: + if self.pgp_address is None or self.pubkey is None: raise ValueError self.push('send_key_confirm_request') diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py index c59cfe0..e3cc833 100644 --- a/src/mailman_pgp/workflows/tests/test_base.py +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -37,8 +37,8 @@ from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.tests.base import load_key from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.testing.layers import PGPConfigLayer -from mailman_pgp.workflows.base import (KEY_REQUEST, PGPMixin, SetPubkeyMixin, - ConfirmPubkeyMixin) +from mailman_pgp.workflows.base import (ConfirmPubkeyMixin, KEY_REQUEST, + PGPMixin, SetPubkeyMixin) class PubkeyMixinTestSetup(): -- cgit v1.2.3-70-g09d2 From 9f0ac2239af18f780c757f8cf6524c99de2dffe8 Mon Sep 17 00:00:00 2001 From: J08nY Date: Fri, 14 Jul 2017 15:13:47 +0200 Subject: More tests for key command. --- src/mailman_pgp/commands/eml_key.py | 4 + src/mailman_pgp/commands/tests/test_key.py | 295 +++++++++++++++++++++++++++-- src/mailman_pgp/workflows/base.py | 3 +- src/mailman_pgp/workflows/key_change.py | 4 +- 4 files changed, 288 insertions(+), 18 deletions(-) diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index 2f7a7e7..1b6dc9f 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -109,6 +109,10 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): print('A pgp enabled address not found.', file=results) return ContinueProcessing.no + if pgp_address.key is None: + print('No key set.', file=results) + return ContinueProcessing.no + wrapped = PGPWrapper(msg) if wrapped.is_encrypted(): decrypted = wrapped.decrypt(pgp_list.key) diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index 44d5b25..d0ff7e9 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -157,6 +157,49 @@ class TestPreSubscription(unittest.TestCase): confirm_wrapped = PGPWrapper(confirm_request) self.assertTrue(confirm_wrapped.is_encrypted()) + def test_set_encrypted(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart) + + get_queue_messages('virgin') + + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'Re: key set {}'.format(token)) + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.encrypt(self.pgp_list.pubkey, + self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + pgp_address = PGPAddress.for_address(bart) + self.assertIsNotNone(pgp_address) + self.assertEqual(pgp_address.key.fingerprint, + self.bart_key.fingerprint) + self.assertFalse(pgp_address.key_confirmed) + + items = get_queue_messages('virgin', expected_count=2) + if items[0].msg['Subject'] == 'The results of your email commands': + results = items[0].msg + confirm_request = items[1].msg + else: + results = items[1].msg + confirm_request = items[0].msg + + self.assertIn('Key succesfully set.', results.get_payload()) + self.assertIn('Key fingerprint: {}'.format(self.bart_key.fingerprint), + results.get_payload()) + + confirm_wrapped = PGPWrapper(confirm_request) + self.assertTrue(confirm_wrapped.is_encrypted()) + def test_set_no_token(self): message = _create_plain('bart@example.com', 'test@example.com', 'key set', '') @@ -227,6 +270,47 @@ class TestPreSubscription(unittest.TestCase): self.assertIn('No adddress to subscribe with.', results_msg.get_payload()) + def test_set_no_pgp_address(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'key set token') + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('A pgp enabled address not found.', + results_msg.get_payload()) + + def test_set_wrong_token(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + with transaction() as t: + pgp_address = PGPAddress(bart) + t.add(pgp_address) + + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'key set token') + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Wrong token.', results_msg.get_payload()) + def test_confirm(self): self.mlist.subscription_policy = OpenSubscriptionPolicy bart = getUtility(IUserManager).create_address('bart@example.com', @@ -321,6 +405,29 @@ class TestPreSubscription(unittest.TestCase): self.assertIn('A pgp enabled address not found.', results_msg.get_payload()) + def test_confirm_no_key(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + t.add(pgp_address) + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm token', + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + 'token')) + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign(self.bart_key) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No key set.', results_msg.get_payload()) + def test_confirm_not_signed(self): self.mlist.subscription_policy = OpenSubscriptionPolicy bart = getUtility(IUserManager).create_address('bart@example.com', @@ -348,6 +455,92 @@ class TestPreSubscription(unittest.TestCase): self.assertIn('Message not signed, ignoring.', results_msg.get_payload()) + def test_confirm_invalid_sig(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart, pubkey=self.bart_key.pubkey) + + get_queue_messages('virgin') + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm {}'.format(token), + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + token)) + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign(self.bart_key) + message.get_payload(0).set_payload( + 'Something that was definitely not signed.') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Message failed to verify.', + results_msg.get_payload()) + + def test_confirm_wrong_token(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + t.add(pgp_address) + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm token', + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + 'token')) + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign(self.bart_key) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Wrong token.', results_msg.get_payload()) + + def test_confirm_no_signed_statement(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart, pubkey=self.bart_key.pubkey) + + get_queue_messages('virgin') + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm {}'.format(token), + 'Some text, that definitely does not' + 'contain the required/expected statement.') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign(self.bart_key) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn("Message doesn't contain the expected statement.", + results_msg.get_payload()) + @public class TestAfterSubscription(unittest.TestCase): @@ -358,22 +551,23 @@ class TestAfterSubscription(unittest.TestCase): self.pgp_list = PGPMailingList.for_list(self.mlist) self.pgp_list.key = load_key('ecc_p256.priv.asc') - def test_key_change(self): + self.bart_key = load_key('rsa_1024.priv.asc') + self.bart_new_key = load_key('ecc_p256.priv.asc') + + def test_change(self): bart = getUtility(IUserManager).create_address('bart@example.com', 'Bart Person') - bart_key = load_key('rsa_1024.priv.asc') - bart_new_key = load_key('ecc_p256.priv.asc') with transaction() as t: pgp_address = PGPAddress(bart) - pgp_address.key = bart_key.pubkey + pgp_address.key = self.bart_key.pubkey pgp_address.key_confirmed = True t.add(pgp_address) message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(bart_new_key.pubkey) + message = wrapped_message.attach_key(self.bart_new_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -391,25 +585,23 @@ class TestAfterSubscription(unittest.TestCase): confirm_wrapped = PGPWrapper(confirm_request) self.assertTrue(confirm_wrapped.is_encrypted()) - decrypted = confirm_wrapped.decrypt(bart_new_key) + decrypted = confirm_wrapped.decrypt(self.bart_new_key) self.assertIn('key confirm', decrypted['subject']) - def test_key_change_confirm(self): + def test_change_confirm(self): bart = getUtility(IUserManager).create_address('bart@example.com', 'Bart Person') - bart_key = load_key('rsa_1024.priv.asc') - bart_new_key = load_key('ecc_p256.priv.asc') with transaction() as t: pgp_address = PGPAddress(bart) - pgp_address.key = bart_key.pubkey + pgp_address.key = self.bart_key.pubkey pgp_address.key_confirmed = True t.add(pgp_address) message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(bart_new_key.pubkey) + message = wrapped_message.attach_key(self.bart_new_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -421,7 +613,7 @@ class TestAfterSubscription(unittest.TestCase): else: confirm_request = items[0].msg request_wrapped = PGPWrapper(confirm_request) - decrypted = request_wrapped.decrypt(bart_new_key) + decrypted = request_wrapped.decrypt(self.bart_new_key) subj = decrypted['subject'] token = subj.split(' ')[-1] @@ -429,15 +621,88 @@ class TestAfterSubscription(unittest.TestCase): confirm_message = _create_plain('bart@example.com', 'test@example.com', decrypted['subject'], CHANGE_CONFIRM_REQUEST.format( - bart_new_key.fingerprint, + self.bart_new_key.fingerprint, token)) wrapped_confirm = MIMEWrapper(confirm_message) - confirm = wrapped_confirm.sign(bart_key) + confirm = wrapped_confirm.sign(self.bart_key) mm_config.switchboards['command'].enqueue(confirm, listid='test.example.com') make_testable_runner(CommandRunner, 'command').run() pgp_address = PGPAddress.for_address(bart) - self.assertEqual(pgp_address.key_fingerprint, bart_new_key.fingerprint) + self.assertEqual(pgp_address.key_fingerprint, + self.bart_new_key.fingerprint) self.assertTrue(pgp_address.key_confirmed) + + def test_change_extra_arg(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key change extra arguments', '') + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Extraneous argument/s: extra,arguments', + results_msg.get_payload()) + + def test_change_no_key(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key change', '') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No keys attached? Send a key.', + results_msg.get_payload()) + + def test_change_multiple_keys(self): + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'key change') + + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_new_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('More than one key! Send only one key.', + results_msg.get_payload()) + + def test_change_no_email(self): + message = _create_mixed('', 'test@example.com', 'key change') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_key(self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No email to change key of.', results_msg.get_payload()) + + def test_change_no_pgp_address(self): + message = _create_mixed('bart@example.com', 'test@example.com', + 'key change') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_key(self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('A pgp enabled address not found.', + results_msg.get_payload()) diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index 014f2dd..d35e58c 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -130,8 +130,7 @@ class ConfirmPubkeyMixin: self.token)) pgp_list = PGPMailingList.for_list(self.mlist) wrapped = PGPWrapper(msg) - encrypted = wrapped.sign_encrypt(pgp_list.key, pgp_address.key, - pgp_list.pubkey) + encrypted = wrapped.sign_encrypt(pgp_list.key, pgp_address.key) msg.set_payload(encrypted.get_payload()) copy_headers(encrypted, msg, True) diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py index c6d3ebc..cc5b9fc 100644 --- a/src/mailman_pgp/workflows/key_change.py +++ b/src/mailman_pgp/workflows/key_change.py @@ -28,6 +28,7 @@ from zope.interface import implementer from mailman_pgp.database import transaction from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.utils import copy_headers from mailman_pgp.pgp.wrapper import PGPWrapper @@ -58,6 +59,7 @@ class KeyChangeWorkflow(Workflow): def __init__(self, mlist, pgp_address=None, pubkey=None): super().__init__() self.mlist = mlist + self.pgp_list = PGPMailingList.for_list(mlist) self.pgp_address = pgp_address self.pubkey = pubkey @@ -104,7 +106,7 @@ class KeyChangeWorkflow(Workflow): self.pubkey.fingerprint, self.token)) wrapped = PGPWrapper(msg) - encrypted = wrapped.encrypt(self.pubkey) + encrypted = wrapped.sign_encrypt(self.pgp_list.key, self.pubkey) msg.set_payload(encrypted.get_payload()) copy_headers(encrypted, msg, True) -- cgit v1.2.3-70-g09d2 From 59ddda945ecf7de859f7976c0cea32dc6434b98e Mon Sep 17 00:00:00 2001 From: J08nY Date: Fri, 14 Jul 2017 21:35:01 +0200 Subject: More tests for PGP message wrappers and fix inline decrypt. --- src/mailman_pgp/pgp/inline.py | 5 ++- src/mailman_pgp/pgp/tests/base.py | 58 +++++++++++++++++++++---------- src/mailman_pgp/pgp/tests/test_inline.py | 46 ++++++++++++++++++------ src/mailman_pgp/pgp/tests/test_keygen.py | 2 ++ src/mailman_pgp/pgp/tests/test_mime.py | 38 +++++++++++++------- src/mailman_pgp/pgp/tests/test_wrapper.py | 44 +++++++++++++++++------ 6 files changed, 140 insertions(+), 53 deletions(-) diff --git a/src/mailman_pgp/pgp/inline.py b/src/mailman_pgp/pgp/inline.py index 9c19030..458cc26 100644 --- a/src/mailman_pgp/pgp/inline.py +++ b/src/mailman_pgp/pgp/inline.py @@ -201,7 +201,10 @@ class InlineWrapper: def _decrypt(self, part, key): message = PGPMessage.from_blob(part.get_payload()) decrypted = key.decrypt(message) - part.set_payload(decrypted.message) + if decrypted.is_signed: + part.set_payload(str(decrypted)) + else: + part.set_payload(decrypted.message) def decrypt(self, key): """ diff --git a/src/mailman_pgp/pgp/tests/base.py b/src/mailman_pgp/pgp/tests/base.py index d561fda..52e5ec4 100644 --- a/src/mailman_pgp/pgp/tests/base.py +++ b/src/mailman_pgp/pgp/tests/base.py @@ -42,63 +42,66 @@ def load_key(path): class WrapperTestCase(TestCase): wrapper = None + def wrap(self, message): + return self.wrapper(message) + def is_signed(self, message, signed): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) self.assertEqual(wrapped.is_signed(), signed) def sign(self, message, key): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) signed = wrapped.sign(key) - signed_wrapped = self.wrapper(signed) + signed_wrapped = self.wrap(signed) self.assertTrue(signed_wrapped.is_signed()) def sign_verify(self, message, priv, pub): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) signed = wrapped.sign(priv) - signed_wrapped = self.wrapper(signed) + signed_wrapped = self.wrap(signed) for signature in signed_wrapped.verify(pub): self.assertTrue(bool(signature)) def verify(self, message, key, valid): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) for signature in wrapped.verify(key): self.assertEqual(bool(signature), valid) def is_encrypted(self, message, encrypted): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) self.assertEqual(wrapped.is_encrypted(), encrypted) def encrypt(self, message, *keys, **kwargs): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) encrypted = wrapped.encrypt(*keys, **kwargs) - encrypted_wrapped = self.wrapper(encrypted) + encrypted_wrapped = self.wrap(encrypted) self.assertTrue(encrypted_wrapped.is_encrypted()) def encrypt_decrypt(self, message, pub, priv): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) encrypted = wrapped.encrypt(pub) - encrypted_wrapped = self.wrapper(encrypted) + encrypted_wrapped = self.wrap(encrypted) decrypted = encrypted_wrapped.decrypt(priv) - decrypted_wrapped = self.wrapper(decrypted) + decrypted_wrapped = self.wrap(decrypted) self.assertFalse(decrypted_wrapped.is_encrypted()) self.assertEqual(decrypted.get_payload(), message.get_payload()) def decrypt(self, message, key, clear): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) decrypted = wrapped.decrypt(key) - decrypted_wrapped = self.wrapper(decrypted) + decrypted_wrapped = self.wrap(decrypted) self.assertFalse(decrypted_wrapped.is_encrypted()) self.assertEqual(decrypted.get_payload(), clear) def has_keys(self, message, has_keys): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) self.assertEqual(wrapped.has_keys(), has_keys) def keys(self, message, keys): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) loaded = list(wrapped.keys()) self.assertEqual(len(loaded), len(keys)) @@ -107,13 +110,30 @@ class WrapperTestCase(TestCase): self.assertListEqual(loaded_fingerprints, fingerprints) def sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) encrypted = wrapped.sign_encrypt(sign_key, encrypt_key.pubkey) - encrypted_wrapped = self.wrapper(encrypted) + encrypted_wrapped = self.wrap(encrypted) + self.assertTrue(encrypted_wrapped.is_encrypted()) + + decrypted = encrypted_wrapped.decrypt(encrypt_key) + decrypted_wrapped = self.wrap(decrypted) + self.assertTrue(decrypted_wrapped.is_signed()) + self.assertFalse(decrypted_wrapped.is_encrypted()) + + verification = decrypted_wrapped.verify(sign_key.pubkey) + for sig in verification: + self.assertTrue(bool(sig)) + self.assertListEqual(list(decrypted_wrapped.get_signed()), + list(wrapped.get_payload())) + + def sign_then_encrypt_decrypt_verify(self, message, sign_key, encrypt_key): + wrapped = self.wrap(message) + encrypted = wrapped.sign_then_encrypt(sign_key, encrypt_key.pubkey) + encrypted_wrapped = self.wrap(encrypted) self.assertTrue(encrypted_wrapped.is_encrypted()) decrypted = encrypted_wrapped.decrypt(encrypt_key) - decrypted_wrapped = self.wrapper(decrypted) + decrypted_wrapped = self.wrap(decrypted) self.assertTrue(decrypted_wrapped.is_signed()) self.assertFalse(decrypted_wrapped.is_encrypted()) diff --git a/src/mailman_pgp/pgp/tests/test_inline.py b/src/mailman_pgp/pgp/tests/test_inline.py index cd18209..a89963a 100644 --- a/src/mailman_pgp/pgp/tests/test_inline.py +++ b/src/mailman_pgp/pgp/tests/test_inline.py @@ -18,6 +18,7 @@ """Tests for the inline wrapper.""" from parameterized import parameterized +from public import public from mailman_pgp.pgp.inline import InlineWrapper from mailman_pgp.pgp.tests.base import load_key, load_message, WrapperTestCase @@ -27,6 +28,7 @@ class InlineWrapperTestCase(WrapperTestCase): wrapper = InlineWrapper +@public class TestSigning(InlineWrapperTestCase): @parameterized.expand([ (load_message('inline_cleartext_signed.eml'), @@ -43,7 +45,7 @@ class TestSigning(InlineWrapperTestCase): False) ]) def test_is_signed(self, message, signed): - super().is_signed(message, signed) + self.is_signed(message, signed) @parameterized.expand([ (load_message('clear.eml'), @@ -52,7 +54,7 @@ class TestSigning(InlineWrapperTestCase): load_key('ecc_p256.priv.asc')) ]) def test_sign(self, message, key): - super().sign(message, key) + self.sign(message, key) @parameterized.expand([ (load_message('clear.eml'), @@ -63,7 +65,7 @@ class TestSigning(InlineWrapperTestCase): load_key('ecc_p256.pub.asc')) ]) def test_sign_verify(self, message, priv, pub): - super().sign_verify(message, priv, pub) + self.sign_verify(message, priv, pub) @parameterized.expand([ (load_message('inline_cleartext_signed.eml'), @@ -80,9 +82,10 @@ class TestSigning(InlineWrapperTestCase): False), ]) def test_verify(self, message, key, valid): - super().verify(message, key, valid) + self.verify(message, key, valid) +@public class TestEncryption(InlineWrapperTestCase): @parameterized.expand([ (load_message('inline_encrypted.eml'), @@ -99,7 +102,7 @@ class TestEncryption(InlineWrapperTestCase): False) ]) def test_is_encrypted(self, message, encrypted): - super().is_encrypted(message, encrypted) + self.is_encrypted(message, encrypted) @parameterized.expand([ (load_message('clear.eml'), @@ -110,9 +113,9 @@ class TestEncryption(InlineWrapperTestCase): ]) def test_encrypt(self, message, keys, **kwargs): if isinstance(keys, tuple): - super().encrypt(message, *keys, **kwargs) + self.encrypt(message, *keys, **kwargs) else: - super().encrypt(message, keys, **kwargs) + self.encrypt(message, keys, **kwargs) @parameterized.expand([ (load_message('clear.eml'), @@ -123,7 +126,7 @@ class TestEncryption(InlineWrapperTestCase): load_key('ecc_p256.priv.asc')) ]) def test_encrypt_decrypt(self, message, pub, priv): - super().encrypt_decrypt(message, pub, priv) + self.encrypt_decrypt(message, pub, priv) @parameterized.expand([ (load_message('inline_encrypted.eml'), @@ -131,9 +134,10 @@ class TestEncryption(InlineWrapperTestCase): 'Some encrypted text.\n\n') ]) def test_decrypt(self, message, key, clear): - super().decrypt(message, key, clear) + self.decrypt(message, key, clear) +@public class TestKeys(InlineWrapperTestCase): @parameterized.expand([ (load_message('inline_privkey.eml'), @@ -146,7 +150,7 @@ class TestKeys(InlineWrapperTestCase): False) ]) def test_has_keys(self, message, has_keys): - super().has_keys(message, has_keys) + self.has_keys(message, has_keys) @parameterized.expand([ (load_message('inline_privkey.eml'), @@ -155,4 +159,24 @@ class TestKeys(InlineWrapperTestCase): [load_key('rsa_1024.pub.asc')]) ]) def test_keys(self, message, keys): - super().keys(message, keys) + self.keys(message, keys) + + +@public +class TestCombined(InlineWrapperTestCase): + @parameterized.expand([ + (load_message('clear.eml'), + load_key('rsa_1024.priv.asc'), + load_key('ecc_p256.priv.asc')) + ]) + def test_sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key): + self.sign_encrypt_decrypt_verify(message, sign_key, encrypt_key) + + @parameterized.expand([ + (load_message('clear.eml'), + load_key('rsa_1024.priv.asc'), + load_key('ecc_p256.priv.asc')) + ]) + def test_sign_then_encrypt_decrypt_verify(self, message, sign_key, + encrypt_key): + self.sign_then_encrypt_decrypt_verify(message, sign_key, encrypt_key) diff --git a/src/mailman_pgp/pgp/tests/test_keygen.py b/src/mailman_pgp/pgp/tests/test_keygen.py index dab6801..5c59614 100644 --- a/src/mailman_pgp/pgp/tests/test_keygen.py +++ b/src/mailman_pgp/pgp/tests/test_keygen.py @@ -21,10 +21,12 @@ from unittest import TestCase from pgpy import PGPKey from pgpy.constants import PubKeyAlgorithm +from public import public from mailman_pgp.pgp.keygen import ListKeyGenerator +@public class TesKeygen(TestCase): def setUp(self): self.keypair_config = { diff --git a/src/mailman_pgp/pgp/tests/test_mime.py b/src/mailman_pgp/pgp/tests/test_mime.py index fab50bb..f56c781 100644 --- a/src/mailman_pgp/pgp/tests/test_mime.py +++ b/src/mailman_pgp/pgp/tests/test_mime.py @@ -18,6 +18,7 @@ """Tests for the MIME wrapper.""" from parameterized import parameterized +from public import public from mailman_pgp.pgp.mime import MIMEWrapper from mailman_pgp.pgp.tests.base import load_key, load_message, WrapperTestCase @@ -27,6 +28,7 @@ class MIMEWrapperTestCase(WrapperTestCase): wrapper = MIMEWrapper +@public class TestSigning(MIMEWrapperTestCase): @parameterized.expand([ (load_message('mime_signed.eml'), @@ -37,7 +39,7 @@ class TestSigning(MIMEWrapperTestCase): False) ]) def test_is_signed(self, message, signed): - super().is_signed(message, signed) + self.is_signed(message, signed) @parameterized.expand([ (load_message('clear.eml'), @@ -46,7 +48,7 @@ class TestSigning(MIMEWrapperTestCase): load_key('ecc_p256.priv.asc')) ]) def test_sign(self, message, key): - super().sign(message, key) + self.sign(message, key) @parameterized.expand([ (load_message('clear.eml'), @@ -57,7 +59,7 @@ class TestSigning(MIMEWrapperTestCase): load_key('ecc_p256.pub.asc')) ]) def test_sign_verify(self, message, priv, pub): - super().sign_verify(message, priv, pub) + self.sign_verify(message, priv, pub) @parameterized.expand([ (load_message('mime_signed.eml'), @@ -68,9 +70,10 @@ class TestSigning(MIMEWrapperTestCase): False) ]) def test_verify(self, message, key, valid): - super().verify(message, key, valid) + self.verify(message, key, valid) +@public class TestEncryption(MIMEWrapperTestCase): @parameterized.expand([ (load_message('mime_encrypted.eml'), @@ -79,7 +82,7 @@ class TestEncryption(MIMEWrapperTestCase): True) ]) def test_is_encrypted(self, message, encrypted): - super().is_encrypted(message, encrypted) + self.is_encrypted(message, encrypted) @parameterized.expand([ (load_message('clear.eml'), @@ -90,9 +93,9 @@ class TestEncryption(MIMEWrapperTestCase): ]) def test_encrypt(self, message, keys, **kwargs): if isinstance(keys, tuple): - super().encrypt(message, *keys, **kwargs) + self.encrypt(message, *keys, **kwargs) else: - super().encrypt(message, keys, **kwargs) + self.encrypt(message, keys, **kwargs) @parameterized.expand([ (load_message('clear.eml'), @@ -103,7 +106,7 @@ class TestEncryption(MIMEWrapperTestCase): load_key('ecc_p256.priv.asc')) ]) def test_encrypt_decrypt(self, message, pub, priv): - super().encrypt_decrypt(message, pub, priv) + self.encrypt_decrypt(message, pub, priv) @parameterized.expand([ (load_message('mime_encrypted.eml'), @@ -111,9 +114,10 @@ class TestEncryption(MIMEWrapperTestCase): 'Some encrypted text.\n') ]) def test_decrypt(self, message, key, clear): - super().decrypt(message, key, clear) + self.decrypt(message, key, clear) +@public class TestKeys(MIMEWrapperTestCase): @parameterized.expand([ (load_message('mime_privkey.eml'), @@ -126,7 +130,7 @@ class TestKeys(MIMEWrapperTestCase): False) ]) def test_has_keys(self, message, has_keys): - super().has_keys(message, has_keys) + self.has_keys(message, has_keys) @parameterized.expand([ (load_message('mime_privkey.eml'), @@ -135,9 +139,10 @@ class TestKeys(MIMEWrapperTestCase): [load_key('rsa_1024.pub.asc')]) ]) def test_keys(self, message, keys): - super().keys(message, keys) + self.keys(message, keys) +@public class TestCombined(MIMEWrapperTestCase): @parameterized.expand([ (load_message('clear.eml'), @@ -145,4 +150,13 @@ class TestCombined(MIMEWrapperTestCase): load_key('ecc_p256.priv.asc')) ]) def test_sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key): - super().sign_encrypt_decrypt_verify(message, sign_key, encrypt_key) + self.sign_encrypt_decrypt_verify(message, sign_key, encrypt_key) + + @parameterized.expand([ + (load_message('clear.eml'), + load_key('rsa_1024.priv.asc'), + load_key('ecc_p256.priv.asc')) + ]) + def test_sign_then_encrypt_decrypt_verify(self, message, sign_key, + encrypt_key): + self.sign_then_encrypt_decrypt_verify(message, sign_key, encrypt_key) diff --git a/src/mailman_pgp/pgp/tests/test_wrapper.py b/src/mailman_pgp/pgp/tests/test_wrapper.py index fb7f0bb..14f40fc 100644 --- a/src/mailman_pgp/pgp/tests/test_wrapper.py +++ b/src/mailman_pgp/pgp/tests/test_wrapper.py @@ -17,6 +17,7 @@ """Tests for the combined wrapper.""" from parameterized import parameterized +from public import public from mailman_pgp.pgp.tests.base import load_key, load_message, WrapperTestCase from mailman_pgp.pgp.wrapper import PGPWrapper @@ -26,6 +27,7 @@ class PGPWrapperTestCase(WrapperTestCase): wrapper = PGPWrapper +@public class TestSigning(PGPWrapperTestCase): @parameterized.expand([ (load_message('inline_cleartext_signed.eml'), @@ -46,7 +48,7 @@ class TestSigning(PGPWrapperTestCase): False) ]) def test_is_signed(self, message, signed): - super().is_signed(message, signed) + self.is_signed(message, signed) @parameterized.expand([ (load_message('clear.eml'), @@ -55,7 +57,7 @@ class TestSigning(PGPWrapperTestCase): load_key('ecc_p256.priv.asc')) ]) def test_sign(self, message, key): - super().sign(message, key) + self.sign(message, key) @parameterized.expand([ (load_message('inline_cleartext_signed.eml'), @@ -72,9 +74,10 @@ class TestSigning(PGPWrapperTestCase): False) ]) def test_verify(self, message, key, valid): - super().verify(message, key, valid) + self.verify(message, key, valid) +@public class TestEncryption(PGPWrapperTestCase): @parameterized.expand([ (load_message('inline_encrypted.eml'), @@ -93,7 +96,7 @@ class TestEncryption(PGPWrapperTestCase): False) ]) def test_is_encrypted(self, message, encrypted): - super().is_encrypted(message, encrypted) + self.is_encrypted(message, encrypted) @parameterized.expand([ (load_message('clear.eml'), @@ -104,9 +107,9 @@ class TestEncryption(PGPWrapperTestCase): ]) def test_encrypt(self, message, keys, **kwargs): if isinstance(keys, tuple): - super().encrypt(message, *keys, **kwargs) + self.encrypt(message, *keys, **kwargs) else: - super().encrypt(message, keys, **kwargs) + self.encrypt(message, keys, **kwargs) @parameterized.expand([ (load_message('clear.eml'), @@ -117,7 +120,7 @@ class TestEncryption(PGPWrapperTestCase): load_key('ecc_p256.priv.asc')) ]) def test_encrypt_decrypt(self, message, pub, priv): - super().encrypt_decrypt(message, pub, priv) + self.encrypt_decrypt(message, pub, priv) @parameterized.expand([ (load_message('inline_encrypted.eml'), @@ -125,9 +128,10 @@ class TestEncryption(PGPWrapperTestCase): 'Some encrypted text.\n\n') ]) def test_decrypt(self, message, key, clear): - super().decrypt(message, key, clear) + self.decrypt(message, key, clear) +@public class TestKeys(PGPWrapperTestCase): @parameterized.expand([ (load_message('inline_privkey.eml'), @@ -146,7 +150,7 @@ class TestKeys(PGPWrapperTestCase): False) ]) def test_has_keys(self, message, has_keys): - super().has_keys(message, has_keys) + self.has_keys(message, has_keys) @parameterized.expand([ (load_message('inline_privkey.eml'), @@ -159,4 +163,24 @@ class TestKeys(PGPWrapperTestCase): [load_key('rsa_1024.pub.asc')]) ]) def test_keys(self, message, keys): - super().keys(message, keys) + self.keys(message, keys) + + +@public +class TestCombined(PGPWrapperTestCase): + @parameterized.expand([ + (load_message('clear.eml'), + load_key('rsa_1024.priv.asc'), + load_key('ecc_p256.priv.asc')) + ]) + def test_sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key): + self.sign_encrypt_decrypt_verify(message, sign_key, encrypt_key) + + @parameterized.expand([ + (load_message('clear.eml'), + load_key('rsa_1024.priv.asc'), + load_key('ecc_p256.priv.asc')) + ]) + def test_sign_then_encrypt_decrypt_verify(self, message, sign_key, + encrypt_key): + self.sign_then_encrypt_decrypt_verify(message, sign_key, encrypt_key) -- cgit v1.2.3-70-g09d2