From 0ba497780cbe1abe9b91321993e31456bfcf1cd0 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Fri, 7 Jul 2017 23:09:39 +0200
Subject: WIP: Sketch out key management on subscription.
---
src/mailman_pgp/commands/eml_key.py | 88 ++++++++++++-
src/mailman_pgp/workflows/__init__.py | 0
src/mailman_pgp/workflows/base.py | 97 +++++++++++++++
src/mailman_pgp/workflows/subscription.py | 137 +++++++++++++++++++++
src/mailman_pgp/workflows/tests/__init__.py | 0
src/mailman_pgp/workflows/tests/test_base.py | 16 +++
.../workflows/tests/test_subscription.py | 16 +++
7 files changed, 351 insertions(+), 3 deletions(-)
create mode 100644 src/mailman_pgp/workflows/__init__.py
create mode 100644 src/mailman_pgp/workflows/base.py
create mode 100644 src/mailman_pgp/workflows/subscription.py
create mode 100644 src/mailman_pgp/workflows/tests/__init__.py
create mode 100644 src/mailman_pgp/workflows/tests/test_base.py
create mode 100644 src/mailman_pgp/workflows/tests/test_subscription.py
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index 9514f48..b280603 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -16,11 +16,21 @@
# this program. If not, see .
"""The key email command."""
+from email.utils import parseaddr
from mailman.interfaces.command import ContinueProcessing, IEmailCommand
+from mailman.interfaces.subscriptions import ISubscriptionManager
+from mailman.interfaces.usermanager import IUserManager
from public import public
+from zope.component import getUtility
from zope.interface import implementer
+from mailman_pgp.database import transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.wrapper import PGPWrapper
+from mailman_pgp.workflows.base import CONFIRM_REQUEST
+
@public
@implementer(IEmailCommand)
@@ -28,7 +38,7 @@ class KeyCommand:
"""The `key` command."""
name = 'key'
- argument_description = ''
+ argument_description = ''
short_description = ''
description = ''
@@ -38,7 +48,78 @@ class KeyCommand:
print('No sub-command specified,'
' must be one of .', file=results)
return ContinueProcessing.no
- if arguments[0] == 'change':
+
+ pgp_list = PGPMailingList.for_list(mlist)
+ if pgp_list is None:
+ print('This mailing list doesnt have pgp enabled.', file=results)
+ return ContinueProcessing.yes # XXX: What does this even mean?
+
+ if arguments[0] == 'set':
+ if len(arguments) != 2:
+ print('Missing token', file=results)
+ return ContinueProcessing.no
+
+ wrapped = PGPWrapper(msg)
+ if not wrapped.has_keys():
+ print('No keys here?')
+ return ContinueProcessing.no
+
+ keys = list(wrapped.keys())
+ if len(keys) != 1:
+ print('More than one key! Which one is yours?')
+ return ContinueProcessing.no
+
+ with transaction() as t:
+ pgp_address = PGPAddress(self.address)
+ pgp_address.key = keys.pop()
+ t.add(pgp_address)
+
+ token = arguments[1]
+ ISubscriptionManager(mlist).confirm(token)
+ # XXX finish this
+ elif arguments[0] == 'confirm':
+ if len(arguments) != 2:
+ print('Missing token', file=results)
+ return ContinueProcessing.no
+
+ display_name, email = parseaddr(msg['from'])
+ # Address could be None or the empty string.
+ if not email:
+ email = msg.sender
+ if not email:
+ print('No email to subscribe with.', file=results)
+ return ContinueProcessing.no
+ um = getUtility(IUserManager)
+
+ pgp_address = PGPAddress.for_address(um.get_address(email))
+ if pgp_address is None:
+ # TODO
+ pass
+
+ wrapped = PGPWrapper(msg)
+ if wrapped.is_encrypted():
+ decrypted = wrapped.decrypt(pgp_list.key)
+ wrapped = PGPWrapper(decrypted)
+
+ if not wrapped.is_signed():
+ print('Message not signed, ignoring.', file=results)
+ return ContinueProcessing.no
+
+ if not wrapped.verifies(pgp_address.key):
+ print('Message failed to verify.', file=results)
+ return ContinueProcessing.no
+
+ token = arguments[1]
+
+ expecting = CONFIRM_REQUEST.format(pgp_address.key.pubkey,
+ token)
+ if wrapped.get_signed() == expecting:
+ ISubscriptionManager(mlist).confirm(token)
+ else:
+ # TODO
+ pass
+
+ elif arguments[0] == 'change':
# New public key in attachment, requires to be signed with current
# key
pass
@@ -52,4 +133,5 @@ class KeyCommand:
else:
print('Wrong sub-command specified,'
' must be one of .', file=results)
- return ContinueProcessing.no
+
+ return ContinueProcessing.no
diff --git a/src/mailman_pgp/workflows/__init__.py b/src/mailman_pgp/workflows/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
new file mode 100644
index 0000000..40ff4d7
--- /dev/null
+++ b/src/mailman_pgp/workflows/base.py
@@ -0,0 +1,97 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see .
+
+""""""
+from mailman.email.message import UserNotification
+from mailman.interfaces.subscriptions import TokenOwner
+
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.wrapper import PGPWrapper
+
+KEY_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+We need your pubkey.
+Reply to this message with it as a PGP/MIME(preferred) or inline.
+----------"""
+
+CONFIRM_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+Reply to this message with this whole text
+signed with your supplied key, either inline or PGP/MIME.
+
+Fingerprint: {}
+Token: {}
+----------
+"""
+
+
+class PubkeyMixin:
+ def __init__(self, pubkey=None):
+ self.pubkey = pubkey
+
+ def _step_pubkey_checks(self):
+ if not self.pubkey:
+ self.push('send_key_request')
+
+ def _step_send_key_request(self):
+ self._set_token(TokenOwner.subscriber)
+ self.push('receive_key')
+ self.save()
+ request_address = self.mlist.request_address
+ email_address = self.address.email
+ msg = UserNotification(email_address, request_address,
+ 'key set {}'.format(self.token),
+ KEY_REQUEST)
+ msg.send(self.mlist, add_precedence=False)
+ # Now we wait for the confirmation.
+ raise StopIteration
+
+ def _step_receive_key(self):
+ pgp_address = PGPAddress.for_address(self.address)
+ if pgp_address is None or pgp_address.key:
+ # The workflow was confirmed but we still dont have an address
+ # or the pubkey. So resend request and wait.
+ self.push('send_key_request')
+ return
+ else:
+ self.pubkey = pgp_address.key
+ self.push('send_confirm_request')
+
+ def _step_send_confirm_request(self):
+ self._set_token(TokenOwner.subscriber)
+ self.push('receive_confirmation')
+ self.save()
+ request_address = self.mlist.request_address
+ email_address = self.address.email
+ msg = UserNotification(email_address, request_address,
+ 'key confirm {}'.format(self.token),
+ CONFIRM_REQUEST.format(self.pubkey.fingerprint,
+ self.token))
+ pgp_list = PGPMailingList.for_list(self.mlist)
+ wrapped = PGPWrapper(msg)
+ encrypted = wrapped.encrypt(self.pubkey, pgp_list.pubkey)
+
+ # XXX: This is not good:
+ msg.set_payload(encrypted)
+ msg.send(self.mlist)
+ raise StopIteration
+
+ def _step_receive_confirmation(self):
+ pass
diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py
new file mode 100644
index 0000000..a571b44
--- /dev/null
+++ b/src/mailman_pgp/workflows/subscription.py
@@ -0,0 +1,137 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see .
+
+""""""
+
+from mailman.core.i18n import _
+from mailman.interfaces.workflows import ISubscriptionWorkflow
+from mailman.workflows.common import (ConfirmationMixin, ModerationMixin,
+ SubscriptionBase, VerificationMixin)
+from public import public
+from zope.interface import implementer
+
+from mailman_pgp.workflows.base import PubkeyMixin
+
+
+@public
+@implementer(ISubscriptionWorkflow)
+class ConfimSubscriptionPolicy(SubscriptionBase, VerificationMixin,
+ ConfirmationMixin, PubkeyMixin):
+ """"""
+
+ name = 'pgp-policy-confirm'
+ description = _('An subscription policy, for a PGP-enabled mailing list '
+ 'that requires confirmation.')
+ initial_state = 'prepare'
+ save_attributes = (
+ 'verified',
+ 'confirmed',
+ 'pubkey',
+ 'address_key',
+ 'subscriber_key',
+ 'user_key',
+ 'token_owner_key',
+ )
+
+ def __init__(self, mlist, subscriber=None, *,
+ pre_verified=False, pre_confirmed=False, pubkey=None):
+ SubscriptionBase.__init__(self, mlist, subscriber)
+ VerificationMixin.__init__(self, pre_verified=pre_verified)
+ ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed)
+ PubkeyMixin.__init__(self, pubkey=pubkey)
+
+ def _step_prepare(self):
+ self.push('do_subscription')
+ self.push('pubkey_checks')
+ self.push('confirmation_checks')
+ self.push('verification_checks')
+ self.push('sanity_checks')
+
+
+@public
+@implementer(ISubscriptionWorkflow)
+class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
+ ModerationMixin, PubkeyMixin):
+ """"""
+
+ name = 'pgp-policy-moderate'
+ description = _('An subscription policy, for a PGP-enabled mailing list '
+ 'that requires moderation.')
+ initial_state = 'prepare'
+ save_attributes = (
+ 'verified',
+ 'approved',
+ 'pubkey',
+ 'address_key',
+ 'subscriber_key',
+ 'user_key',
+ 'token_owner_key',
+ )
+
+ def __init__(self, mlist, subscriber=None, *,
+ pre_verified=False, pre_approved=False, pubkey=None):
+ SubscriptionBase.__init__(self, mlist, subscriber)
+ VerificationMixin.__init__(self, pre_verified=pre_verified)
+ ModerationMixin.__init__(self, pre_approved=pre_approved)
+ PubkeyMixin.__init__(self, pubkey=pubkey)
+
+ def _step_prepare(self):
+ self.push('do_subscription')
+ self.push('moderation_checks')
+ self.push('pubkey_checks')
+ self.push('verification_checks')
+ self.push('sanity_checks')
+
+
+@public
+@implementer(ISubscriptionWorkflow)
+class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
+ ConfirmationMixin, ModerationMixin,
+ PubkeyMixin):
+ """"""
+
+ name = 'pgp-policy-confirm-moderate'
+ description = _('An subscription policy, for a PGP-enabled mailing list '
+ 'that requires moderation after confirmation.')
+ initial_state = 'prepare'
+ save_attributes = (
+ 'verified',
+ 'confirmed',
+ 'approved',
+ 'pubkey',
+ 'address_key',
+ 'subscriber_key',
+ 'user_key',
+ 'token_owner_key',
+ )
+
+ def __init__(self, mlist, subscriber=None, *,
+ pre_verified=False, pre_confirmed=False, pre_approved=False,
+ pubkey=None):
+ SubscriptionBase.__init__(self, mlist, subscriber)
+ VerificationMixin.__init__(self, pre_verified=pre_verified)
+ ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed)
+ ModerationMixin.__init__(self, pre_approved=pre_approved)
+ PubkeyMixin.__init__(self, pubkey=pubkey)
+
+ def _step_prepare(self):
+ self.push('do_subscription')
+ self.push('moderation_checks')
+ self.push('pubkey_checks')
+ self.push('confirmation_checks')
+ self.push('verification_checks')
+ self.push('sanity_checks')
diff --git a/src/mailman_pgp/workflows/tests/__init__.py b/src/mailman_pgp/workflows/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
new file mode 100644
index 0000000..8b6b4d1
--- /dev/null
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -0,0 +1,16 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see .
diff --git a/src/mailman_pgp/workflows/tests/test_subscription.py b/src/mailman_pgp/workflows/tests/test_subscription.py
new file mode 100644
index 0000000..8b6b4d1
--- /dev/null
+++ b/src/mailman_pgp/workflows/tests/test_subscription.py
@@ -0,0 +1,16 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see .
--
cgit v1.2.3-70-g09d2
From 589cfab420d1fa9343956be1a31e28e3524f5f69 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Mon, 10 Jul 2017 23:32:24 +0200
Subject: Add an overwrite boolean argument to copy_headers util function.
---
src/mailman_pgp/pgp/utils.py | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/src/mailman_pgp/pgp/utils.py b/src/mailman_pgp/pgp/utils.py
index 0946e3f..a824138 100644
--- a/src/mailman_pgp/pgp/utils.py
+++ b/src/mailman_pgp/pgp/utils.py
@@ -18,7 +18,7 @@
"""Various pgp and email utilities."""
-def copy_headers(from_msg, to_msg):
+def copy_headers(from_msg, to_msg, overwrite=False):
"""
Copy the headers and unixfrom from a message to another one.
@@ -28,6 +28,8 @@ def copy_headers(from_msg, to_msg):
:type to_msg: email.message.Message
"""
for key, value in from_msg.items():
+ if overwrite:
+ del to_msg[key]
if key not in to_msg:
to_msg[key] = value
if to_msg.get_unixfrom() is None:
--
cgit v1.2.3-70-g09d2
From 7b136683c76afae000c07f5dc54f3785b038695f Mon Sep 17 00:00:00 2001
From: J08nY
Date: Mon, 10 Jul 2017 23:32:50 +0200
Subject: Make MIMEWrapper create messages with proper header casing.
---
src/mailman_pgp/pgp/mime.py | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py
index afb1c35..1ecbe76 100644
--- a/src/mailman_pgp/pgp/mime.py
+++ b/src/mailman_pgp/pgp/mime.py
@@ -181,9 +181,9 @@ class MIMEWrapper:
_subtype=MIMEWrapper._signature_subtype,
_encoder=encode_7or8bit,
name='signature.asc')
- second_part.add_header('content-description',
+ second_part.add_header('Content-Description',
'OpenPGP digital signature')
- second_part.add_header('content-disposition', 'attachment',
+ second_part.add_header('Content-Disposition', 'attachment',
filename='signature.asc')
out.attach(copy.deepcopy(msg))
@@ -249,16 +249,16 @@ class MIMEWrapper:
first_part = MIMEApplication(_data='Version: 1',
_subtype=MIMEWrapper._encryption_subtype,
_encoder=encode_7or8bit)
- first_part.add_header('content-description',
+ first_part.add_header('Content-Description',
'PGP/MIME version identification')
second_part = MIMEApplication(_data=str(payload),
_subtype='octet-stream',
_encoder=encode_7or8bit,
name='encrypted.asc')
- second_part.add_header('content-description',
+ second_part.add_header('Content-Description',
'OpenPGP encrypted message')
- second_part.add_header('content-disposition', 'inline',
+ second_part.add_header('Content-Disposition', 'inline',
filename='encrypted.asc')
out.attach(first_part)
out.attach(second_part)
--
cgit v1.2.3-70-g09d2
From 4d98c0bcc1ee4eb7d2de38a7ae21f97f1b0c9943 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Mon, 10 Jul 2017 23:43:44 +0200
Subject: Add pubkey_confirmed attribute to subscription workflows.
---
src/mailman_pgp/workflows/base.py | 32 ++++++++++++++++++++++++++-----
src/mailman_pgp/workflows/subscription.py | 30 ++++++++++++++++++-----------
2 files changed, 46 insertions(+), 16 deletions(-)
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index 40ff4d7..3f4ba8d 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -18,9 +18,11 @@
""""""
from mailman.email.message import UserNotification
from mailman.interfaces.subscriptions import TokenOwner
+from pgpy import PGPKey
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.utils import copy_headers
from mailman_pgp.pgp.wrapper import PGPWrapper
KEY_REQUEST = """\
@@ -43,12 +45,29 @@ Token: {}
class PubkeyMixin:
- def __init__(self, pubkey=None):
+ def __init__(self, pubkey=None, pre_confirmed=False):
self.pubkey = pubkey
+ self.pubkey_confirmed = pre_confirmed
+
+ @property
+ def pubkey_key(self):
+ if self.pubkey is None:
+ return None
+ return str(self.pubkey)
+
+ @pubkey_key.setter
+ def pubkey_key(self, value):
+ if value is not None:
+ self.pubkey, _ = PGPKey.from_blob(value)
+ else:
+ self.pubkey = None
def _step_pubkey_checks(self):
if not self.pubkey:
self.push('send_key_request')
+ else:
+ if not self.pubkey_confirmed:
+ self.push('send_confirm_request')
def _step_send_key_request(self):
self._set_token(TokenOwner.subscriber)
@@ -65,14 +84,15 @@ class PubkeyMixin:
def _step_receive_key(self):
pgp_address = PGPAddress.for_address(self.address)
- if pgp_address is None or pgp_address.key:
+ if pgp_address is None or pgp_address.key is None:
# The workflow was confirmed but we still dont have an address
# or the pubkey. So resend request and wait.
self.push('send_key_request')
return
else:
self.pubkey = pgp_address.key
- self.push('send_confirm_request')
+ if not self.pubkey_confirmed:
+ self.push('send_confirm_request')
def _step_send_confirm_request(self):
self._set_token(TokenOwner.subscriber)
@@ -86,10 +106,12 @@ class PubkeyMixin:
self.token))
pgp_list = PGPMailingList.for_list(self.mlist)
wrapped = PGPWrapper(msg)
- encrypted = wrapped.encrypt(self.pubkey, pgp_list.pubkey)
+ encrypted = wrapped.sign_encrypt(pgp_list.key, self.pubkey,
+ pgp_list.pubkey)
# XXX: This is not good:
- msg.set_payload(encrypted)
+ msg.set_payload(encrypted.get_payload())
+ copy_headers(encrypted, msg, True)
msg.send(self.mlist)
raise StopIteration
diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py
index a571b44..6b8240c 100644
--- a/src/mailman_pgp/workflows/subscription.py
+++ b/src/mailman_pgp/workflows/subscription.py
@@ -29,8 +29,8 @@ from mailman_pgp.workflows.base import PubkeyMixin
@public
@implementer(ISubscriptionWorkflow)
-class ConfimSubscriptionPolicy(SubscriptionBase, VerificationMixin,
- ConfirmationMixin, PubkeyMixin):
+class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
+ ConfirmationMixin, PubkeyMixin):
""""""
name = 'pgp-policy-confirm'
@@ -40,7 +40,8 @@ class ConfimSubscriptionPolicy(SubscriptionBase, VerificationMixin,
save_attributes = (
'verified',
'confirmed',
- 'pubkey',
+ 'pubkey_key',
+ 'pubkey_confirmed',
'address_key',
'subscriber_key',
'user_key',
@@ -48,11 +49,13 @@ class ConfimSubscriptionPolicy(SubscriptionBase, VerificationMixin,
)
def __init__(self, mlist, subscriber=None, *,
- pre_verified=False, pre_confirmed=False, pubkey=None):
+ pre_verified=False, pre_confirmed=False, pubkey=None,
+ pubkey_pre_confirmed=False):
SubscriptionBase.__init__(self, mlist, subscriber)
VerificationMixin.__init__(self, pre_verified=pre_verified)
ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed)
- PubkeyMixin.__init__(self, pubkey=pubkey)
+ PubkeyMixin.__init__(self, pubkey=pubkey,
+ pre_confirmed=pubkey_pre_confirmed)
def _step_prepare(self):
self.push('do_subscription')
@@ -75,7 +78,8 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
save_attributes = (
'verified',
'approved',
- 'pubkey',
+ 'pubkey_key',
+ 'pubkey_confirmed',
'address_key',
'subscriber_key',
'user_key',
@@ -83,11 +87,13 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
)
def __init__(self, mlist, subscriber=None, *,
- pre_verified=False, pre_approved=False, pubkey=None):
+ pre_verified=False, pre_approved=False, pubkey=None,
+ pubkey_pre_confirmed=False):
SubscriptionBase.__init__(self, mlist, subscriber)
VerificationMixin.__init__(self, pre_verified=pre_verified)
ModerationMixin.__init__(self, pre_approved=pre_approved)
- PubkeyMixin.__init__(self, pubkey=pubkey)
+ PubkeyMixin.__init__(self, pubkey=pubkey,
+ pre_confirmed=pubkey_pre_confirmed)
def _step_prepare(self):
self.push('do_subscription')
@@ -112,7 +118,8 @@ class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
'verified',
'confirmed',
'approved',
- 'pubkey',
+ 'pubkey_key',
+ 'pubkey_confirmed',
'address_key',
'subscriber_key',
'user_key',
@@ -121,12 +128,13 @@ class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
def __init__(self, mlist, subscriber=None, *,
pre_verified=False, pre_confirmed=False, pre_approved=False,
- pubkey=None):
+ pubkey=None, pubkey_pre_confirmed=False):
SubscriptionBase.__init__(self, mlist, subscriber)
VerificationMixin.__init__(self, pre_verified=pre_verified)
ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed)
ModerationMixin.__init__(self, pre_approved=pre_approved)
- PubkeyMixin.__init__(self, pubkey=pubkey)
+ PubkeyMixin.__init__(self, pubkey=pubkey,
+ pre_confirmed=pubkey_pre_confirmed)
def _step_prepare(self):
self.push('do_subscription')
--
cgit v1.2.3-70-g09d2
From 3fdc4dfc1f1d2a2f8c9ca1781d54a11166894890 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Mon, 10 Jul 2017 23:45:25 +0200
Subject: Ignore warnings from PGPy.
---
src/mailman_pgp/testing/start.py | 2 ++
1 file changed, 2 insertions(+)
diff --git a/src/mailman_pgp/testing/start.py b/src/mailman_pgp/testing/start.py
index 37f3de4..128855e 100644
--- a/src/mailman_pgp/testing/start.py
+++ b/src/mailman_pgp/testing/start.py
@@ -25,3 +25,5 @@ from public import public
def start_run(plugin):
warnings.filterwarnings(action='ignore', category=UserWarning,
module='.*mailman_pgp.*')
+ warnings.filterwarnings(action='ignore', category=UserWarning,
+ module='.*PGPy.*')
--
cgit v1.2.3-70-g09d2
From ed5fc4825fd56c4ec9b3e4c07554977363c55bc7 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Tue, 11 Jul 2017 00:10:11 +0200
Subject: Check signature expiration in PGPWrapper.verifies.
---
src/mailman_pgp/pgp/wrapper.py | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/src/mailman_pgp/pgp/wrapper.py b/src/mailman_pgp/pgp/wrapper.py
index fcbec0e..6e8a8f9 100644
--- a/src/mailman_pgp/pgp/wrapper.py
+++ b/src/mailman_pgp/pgp/wrapper.py
@@ -80,7 +80,10 @@ class PGPWrapper():
yield from self.inline.verify(key)
def verifies(self, key):
- return all(bool(verification) for verification in self.verify(key))
+ return all(bool(verification) and
+ all(not sigsubj.signature.is_expired
+ for sigsubj in verification.good_signatures) for
+ verification in self.verify(key))
def is_encrypted(self):
return self.mime.is_encrypted() or self.inline.is_encrypted()
--
cgit v1.2.3-70-g09d2
From f8793bcc2e00c4ee639e4f0e75cfc2b19ea1849f Mon Sep 17 00:00:00 2001
From: J08nY
Date: Tue, 11 Jul 2017 00:36:57 +0200
Subject: Add tests for subscription workflows and their pubkey mixin.
---
src/mailman_pgp/styles/tests/test_base.py | 3 +-
src/mailman_pgp/workflows/base.py | 3 +-
src/mailman_pgp/workflows/tests/test_base.py | 174 +++++++++++++++++++++
.../workflows/tests/test_subscription.py | 36 +++++
4 files changed, 212 insertions(+), 4 deletions(-)
diff --git a/src/mailman_pgp/styles/tests/test_base.py b/src/mailman_pgp/styles/tests/test_base.py
index 159a242..d343fe4 100644
--- a/src/mailman_pgp/styles/tests/test_base.py
+++ b/src/mailman_pgp/styles/tests/test_base.py
@@ -33,8 +33,7 @@ class TestBaseStyle(TestCase):
base_style = PGPStyle()
base_style.apply(mlist)
- pgp_list = PGPMailingList.query().filter_by(
- list_id=mlist.list_id).first()
+ pgp_list = PGPMailingList.for_list(mlist)
# Test that we have our PGPMailingList
self.assertIsNotNone(pgp_list)
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index 3f4ba8d..52dd879 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -109,11 +109,10 @@ class PubkeyMixin:
encrypted = wrapped.sign_encrypt(pgp_list.key, self.pubkey,
pgp_list.pubkey)
- # XXX: This is not good:
msg.set_payload(encrypted.get_payload())
copy_headers(encrypted, msg, True)
msg.send(self.mlist)
raise StopIteration
def _step_receive_confirmation(self):
- pass
+ pass
\ No newline at end of file
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
index 8b6b4d1..19b2e58 100644
--- a/src/mailman_pgp/workflows/tests/test_base.py
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -14,3 +14,177 @@
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see .
+
+""""""
+
+import unittest
+from contextlib import suppress
+from unittest.mock import patch
+
+from mailman.app.lifecycle import create_list
+from mailman.interfaces.pending import IPendings
+from mailman.interfaces.usermanager import IUserManager
+from mailman.testing.helpers import get_queue_messages
+from zope.component import getUtility
+
+from mailman_pgp.config import config
+from mailman_pgp.database import mm_transaction, transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.tests.base import load_key
+from mailman_pgp.pgp.wrapper import PGPWrapper
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.workflows.base import KEY_REQUEST
+from mailman_pgp.workflows.subscription import ConfirmSubscriptionPolicy
+
+
+class TestPubkeyMixin(unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ with mm_transaction():
+ self.mlist = create_list('test@example.com',
+ style_name='pgp-default')
+
+ self.list_key = load_key('ecc_p256.priv.asc')
+
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = self.list_key
+
+ self.um = getUtility(IUserManager)
+
+ self.sender_key = load_key('rsa_1024.priv.asc')
+ self.sender = self.um.create_address('rsa-1024b@example.org')
+
+ def test_pended_data(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True)
+ with suppress(StopIteration):
+ workflow.run_thru('send_key_request')
+ self.assertIsNotNone(workflow.token)
+ pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
+ self.assertEqual(pendable['list_id'], 'test.example.com')
+ self.assertEqual(pendable['email'], 'rsa-1024b@example.org')
+ self.assertEqual(pendable['display_name'], '')
+ self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
+ self.assertEqual(pendable['token_owner'], 'subscriber')
+
+ def test_key_request_sent(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True)
+ list(workflow)
+ items = get_queue_messages('virgin', expected_count=1)
+ message = items[0].msg
+ token = workflow.token
+
+ self.assertEqual(message['Subject'], 'key set {}'.format(token))
+ self.assertEqual(message.get_payload(), KEY_REQUEST)
+
+ def test_key_request_pubkey_set(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=True)
+ workflow.run_thru('pubkey_checks')
+ with patch.object(workflow, '_step_do_subscription') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
+ def test_receive_key(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True)
+ list(workflow)
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ t.add(pgp_address)
+
+ receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
+ receive_workflow.token = workflow.token
+ receive_workflow.restore()
+ receive_workflow.run_thru('receive_key')
+
+ self.assertIsNotNone(receive_workflow.pubkey)
+ self.assertEqual(receive_workflow.pubkey.fingerprint,
+ self.sender_key.pubkey.fingerprint)
+
+ with patch.object(receive_workflow,
+ '_step_send_confirm_request') as step:
+ next(receive_workflow)
+ step.assert_called_once_with()
+
+ def test_receive_key_no_address(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True)
+ list(workflow)
+ receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
+ receive_workflow.token = workflow.token
+ receive_workflow.restore()
+ receive_workflow.run_thru('receive_key')
+
+ self.assertIsNone(receive_workflow.pubkey)
+ with patch.object(receive_workflow,
+ '_step_send_key_request') as step:
+ next(receive_workflow)
+ step.assert_called_once_with()
+
+ def test_receive_key_pubkey_confirmed(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True,
+ pubkey_pre_confirmed=True)
+ list(workflow)
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ t.add(pgp_address)
+
+ receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
+ receive_workflow.token = workflow.token
+ receive_workflow.restore()
+ receive_workflow.run_thru('receive_key')
+ with patch.object(receive_workflow, '_step_do_subscription') as step:
+ next(receive_workflow)
+ step.assert_called_once_with()
+
+ def test_send_key_confirm_request(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=False)
+ list(workflow)
+ items = get_queue_messages('virgin', expected_count=1)
+ message = items[0].msg
+ token = workflow.token
+
+ self.assertEqual(message['Subject'], 'key confirm {}'.format(token))
+ wrapped = PGPWrapper(message)
+ self.assertTrue(wrapped.is_encrypted())
+
+ def test_receive_confirmation(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=False)
+ list(workflow)
+
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ t.add(pgp_address)
+
+ receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
+ receive_workflow.token = workflow.token
+ receive_workflow.restore()
+ receive_workflow.run_thru('receive_confirmation')
+
+ with patch.object(receive_workflow, '_step_do_subscription') as step:
+ next(receive_workflow)
+ step.assert_called_once_with()
diff --git a/src/mailman_pgp/workflows/tests/test_subscription.py b/src/mailman_pgp/workflows/tests/test_subscription.py
index 8b6b4d1..da59bee 100644
--- a/src/mailman_pgp/workflows/tests/test_subscription.py
+++ b/src/mailman_pgp/workflows/tests/test_subscription.py
@@ -14,3 +14,39 @@
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see .
+
+""""""
+import unittest
+
+from mailman.app.lifecycle import create_list
+from mailman.interfaces.usermanager import IUserManager
+from zope.component import getUtility
+
+from mailman_pgp.database import mm_transaction
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.workflows.subscription import (ConfirmSubscriptionPolicy,
+ ModerationSubscriptionPolicy,
+ ConfirmModerationSubscriptionPolicy)
+
+
+class TestSubscriptionWorkflows(unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ with mm_transaction():
+ self.mlist = create_list('test@example.com',
+ style_name='pgp-default')
+ self.sender = getUtility(IUserManager).create_address(
+ 'rsa-1024b@example.org')
+
+ def test_confirm_policy(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender)
+ next(workflow)
+
+ def test_moderation_policy(self):
+ workflow = ModerationSubscriptionPolicy(self.mlist, self.sender)
+ next(workflow)
+
+ def test_confirm_moderation_policy(self):
+ workflow = ConfirmModerationSubscriptionPolicy(self.mlist, self.sender)
+ next(workflow)
--
cgit v1.2.3-70-g09d2
From 383ce5f4cc6cb81f91f90f3408742c4c06bca301 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Tue, 11 Jul 2017 01:16:23 +0200
Subject: qa.
---
src/mailman_pgp/workflows/base.py | 2 +-
src/mailman_pgp/workflows/tests/test_base.py | 1 -
src/mailman_pgp/workflows/tests/test_subscription.py | 6 +++---
3 files changed, 4 insertions(+), 5 deletions(-)
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index 52dd879..9bf4db8 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -115,4 +115,4 @@ class PubkeyMixin:
raise StopIteration
def _step_receive_confirmation(self):
- pass
\ No newline at end of file
+ pass
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
index 19b2e58..a52571c 100644
--- a/src/mailman_pgp/workflows/tests/test_base.py
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -27,7 +27,6 @@ from mailman.interfaces.usermanager import IUserManager
from mailman.testing.helpers import get_queue_messages
from zope.component import getUtility
-from mailman_pgp.config import config
from mailman_pgp.database import mm_transaction, transaction
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
diff --git a/src/mailman_pgp/workflows/tests/test_subscription.py b/src/mailman_pgp/workflows/tests/test_subscription.py
index da59bee..f8a67f3 100644
--- a/src/mailman_pgp/workflows/tests/test_subscription.py
+++ b/src/mailman_pgp/workflows/tests/test_subscription.py
@@ -24,9 +24,9 @@ from zope.component import getUtility
from mailman_pgp.database import mm_transaction
from mailman_pgp.testing.layers import PGPConfigLayer
-from mailman_pgp.workflows.subscription import (ConfirmSubscriptionPolicy,
- ModerationSubscriptionPolicy,
- ConfirmModerationSubscriptionPolicy)
+from mailman_pgp.workflows.subscription import (
+ ConfirmModerationSubscriptionPolicy, ConfirmSubscriptionPolicy,
+ ModerationSubscriptionPolicy)
class TestSubscriptionWorkflows(unittest.TestCase):
--
cgit v1.2.3-70-g09d2
From 288d5c07825f67e6fd92e93fc3d8369310e268d1 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Tue, 11 Jul 2017 15:13:15 +0200
Subject: Add OpenSubscriptionPolicy.
---
src/mailman_pgp/workflows/subscription.py | 40 ++++++++++++++++++++--
.../workflows/tests/test_subscription.py | 6 +++-
2 files changed, 42 insertions(+), 4 deletions(-)
diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py
index 6b8240c..a7565f3 100644
--- a/src/mailman_pgp/workflows/subscription.py
+++ b/src/mailman_pgp/workflows/subscription.py
@@ -27,6 +27,40 @@ from zope.interface import implementer
from mailman_pgp.workflows.base import PubkeyMixin
+@public
+@implementer(ISubscriptionWorkflow)
+class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, PubkeyMixin):
+ """"""
+
+ name = 'pgp-policy-open'
+ description = _('An open subscription policy, '
+ 'for a PGP-enabled mailing list.')
+ initial_state = 'prepare'
+ save_attributes = (
+ 'verified',
+ 'pubkey_key',
+ 'pubkey_confirmed',
+ 'address_key',
+ 'subscriber_key',
+ 'user_key',
+ 'token_owner_key',
+ )
+
+ def __init__(self, mlist, subscriber=None, *,
+ pre_verified=False, pubkey=None,
+ pubkey_pre_confirmed=False):
+ SubscriptionBase.__init__(self, mlist, subscriber)
+ VerificationMixin.__init__(self, pre_verified=pre_verified)
+ PubkeyMixin.__init__(self, pubkey=pubkey,
+ pre_confirmed=pubkey_pre_confirmed)
+
+ def _step_prepare(self):
+ self.push('do_subscription')
+ self.push('pubkey_checks')
+ self.push('verification_checks')
+ self.push('sanity_checks')
+
+
@public
@implementer(ISubscriptionWorkflow)
class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
@@ -34,7 +68,7 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
""""""
name = 'pgp-policy-confirm'
- description = _('An subscription policy, for a PGP-enabled mailing list '
+ description = _('A subscription policy, for a PGP-enabled mailing list '
'that requires confirmation.')
initial_state = 'prepare'
save_attributes = (
@@ -72,7 +106,7 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
""""""
name = 'pgp-policy-moderate'
- description = _('An subscription policy, for a PGP-enabled mailing list '
+ description = _('A subscription policy, for a PGP-enabled mailing list '
'that requires moderation.')
initial_state = 'prepare'
save_attributes = (
@@ -111,7 +145,7 @@ class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
""""""
name = 'pgp-policy-confirm-moderate'
- description = _('An subscription policy, for a PGP-enabled mailing list '
+ description = _('A subscription policy, for a PGP-enabled mailing list '
'that requires moderation after confirmation.')
initial_state = 'prepare'
save_attributes = (
diff --git a/src/mailman_pgp/workflows/tests/test_subscription.py b/src/mailman_pgp/workflows/tests/test_subscription.py
index f8a67f3..9464a83 100644
--- a/src/mailman_pgp/workflows/tests/test_subscription.py
+++ b/src/mailman_pgp/workflows/tests/test_subscription.py
@@ -26,7 +26,7 @@ from mailman_pgp.database import mm_transaction
from mailman_pgp.testing.layers import PGPConfigLayer
from mailman_pgp.workflows.subscription import (
ConfirmModerationSubscriptionPolicy, ConfirmSubscriptionPolicy,
- ModerationSubscriptionPolicy)
+ ModerationSubscriptionPolicy, OpenSubscriptionPolicy)
class TestSubscriptionWorkflows(unittest.TestCase):
@@ -39,6 +39,10 @@ class TestSubscriptionWorkflows(unittest.TestCase):
self.sender = getUtility(IUserManager).create_address(
'rsa-1024b@example.org')
+ def test_open_policy(self):
+ workflow = OpenSubscriptionPolicy(self.mlist, self.sender)
+ next(workflow)
+
def test_confirm_policy(self):
workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender)
next(workflow)
--
cgit v1.2.3-70-g09d2
From 90d8ab005fede36adf66cf1a042ef33a3ece4c8d Mon Sep 17 00:00:00 2001
From: J08nY
Date: Tue, 11 Jul 2017 19:08:27 +0200
Subject: Reset token owner on key confirmation.
---
src/mailman_pgp/workflows/base.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index 9bf4db8..8a8126a 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -115,4 +115,4 @@ class PubkeyMixin:
raise StopIteration
def _step_receive_confirmation(self):
- pass
+ self._set_token(TokenOwner.no_one)
--
cgit v1.2.3-70-g09d2
From 97abc15b2e92fcf9109997043e4297f16d0bf5c7 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Tue, 11 Jul 2017 20:24:33 +0200
Subject: Restore subscriber in PubkeyMixins last step.
---
src/mailman_pgp/workflows/base.py | 19 ++++++++++++++-----
src/mailman_pgp/workflows/tests/test_base.py | 8 +++++---
2 files changed, 19 insertions(+), 8 deletions(-)
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index 8a8126a..4ca1525 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -18,6 +18,7 @@
""""""
from mailman.email.message import UserNotification
from mailman.interfaces.subscriptions import TokenOwner
+from mailman.workflows.common import WhichSubscriber
from pgpy import PGPKey
from mailman_pgp.model.address import PGPAddress
@@ -63,11 +64,12 @@ class PubkeyMixin:
self.pubkey = None
def _step_pubkey_checks(self):
+ self.push('restore_subscriber')
if not self.pubkey:
self.push('send_key_request')
else:
if not self.pubkey_confirmed:
- self.push('send_confirm_request')
+ self.push('send_key_confirm_request')
def _step_send_key_request(self):
self._set_token(TokenOwner.subscriber)
@@ -92,11 +94,11 @@ class PubkeyMixin:
else:
self.pubkey = pgp_address.key
if not self.pubkey_confirmed:
- self.push('send_confirm_request')
+ self.push('send_key_confirm_request')
- def _step_send_confirm_request(self):
+ def _step_send_key_confirm_request(self):
self._set_token(TokenOwner.subscriber)
- self.push('receive_confirmation')
+ self.push('receive_key_confirmation')
self.save()
request_address = self.mlist.request_address
email_address = self.address.email
@@ -114,5 +116,12 @@ class PubkeyMixin:
msg.send(self.mlist)
raise StopIteration
- def _step_receive_confirmation(self):
+ def _step_receive_key_confirmation(self):
self._set_token(TokenOwner.no_one)
+
+ def _step_restore_subscriber(self):
+ if self.which is WhichSubscriber.address:
+ self.subscriber = self.address
+ else:
+ assert self.which is WhichSubscriber.user
+ self.subscriber = self.user
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
index a52571c..c99c6d6 100644
--- a/src/mailman_pgp/workflows/tests/test_base.py
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -88,6 +88,7 @@ class TestPubkeyMixin(unittest.TestCase):
pubkey=self.sender_key.pubkey,
pubkey_pre_confirmed=True)
workflow.run_thru('pubkey_checks')
+ next(workflow)
with patch.object(workflow, '_step_do_subscription') as step:
next(workflow)
step.assert_called_once_with()
@@ -112,7 +113,7 @@ class TestPubkeyMixin(unittest.TestCase):
self.sender_key.pubkey.fingerprint)
with patch.object(receive_workflow,
- '_step_send_confirm_request') as step:
+ '_step_send_key_confirm_request') as step:
next(receive_workflow)
step.assert_called_once_with()
@@ -147,6 +148,7 @@ class TestPubkeyMixin(unittest.TestCase):
receive_workflow.token = workflow.token
receive_workflow.restore()
receive_workflow.run_thru('receive_key')
+ next(receive_workflow)
with patch.object(receive_workflow, '_step_do_subscription') as step:
next(receive_workflow)
step.assert_called_once_with()
@@ -182,8 +184,8 @@ class TestPubkeyMixin(unittest.TestCase):
receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
receive_workflow.token = workflow.token
receive_workflow.restore()
- receive_workflow.run_thru('receive_confirmation')
-
+ receive_workflow.run_thru('receive_key_confirmation')
+ next(receive_workflow)
with patch.object(receive_workflow, '_step_do_subscription') as step:
next(receive_workflow)
step.assert_called_once_with()
--
cgit v1.2.3-70-g09d2
From 77b63eaae46697b24fe4cc604a36b869619b4638 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Tue, 11 Jul 2017 20:57:29 +0200
Subject: More work on key command. Added key_confirmed attribute to
PGPAddress.
---
src/mailman_pgp/commands/eml_key.py | 220 ++++++++++++++++++-----------
src/mailman_pgp/commands/tests/__init__.py | 0
src/mailman_pgp/commands/tests/test_key.py | 123 ++++++++++++++++
src/mailman_pgp/model/address.py | 10 +-
src/mailman_pgp/pgp/mime.py | 22 ++-
5 files changed, 287 insertions(+), 88 deletions(-)
create mode 100644 src/mailman_pgp/commands/tests/__init__.py
create mode 100644 src/mailman_pgp/commands/tests/test_key.py
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index b280603..e9b5e19 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -32,106 +32,154 @@ from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.workflows.base import CONFIRM_REQUEST
+def _get_email(msg):
+ display_name, email = parseaddr(msg['from'])
+ # Address could be None or the empty string.
+ if not email:
+ email = msg.sender
+ return email
+
+
+def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results):
+ if len(arguments) != 2:
+ print('Missing token.', file=results)
+ return ContinueProcessing.no
+
+ wrapped = PGPWrapper(msg)
+ if not wrapped.has_keys():
+ print('No keys attached? Send a key.', file=results)
+ return ContinueProcessing.no
+
+ keys = list(wrapped.keys())
+ if len(keys) != 1:
+ print('More than one key! Send only one key.', file=results)
+ return ContinueProcessing.no
+
+ email = _get_email(msg)
+ if not email:
+ print('No email to subscribe with.', file=results)
+ return ContinueProcessing.no
+
+ address = getUtility(IUserManager).get_address(email)
+ if not address:
+ print('No adddress to subscribe with.', file=results)
+ return ContinueProcessing.no
+
+ with transaction() as t:
+ pgp_address = PGPAddress(address)
+ pgp_address.key = keys.pop()
+ t.add(pgp_address)
+
+ token = arguments[1]
+ try:
+ ISubscriptionManager(mlist).confirm(token)
+ print('Key succesfully set.', file=results)
+ print('Key fingerprint: {}'.format(pgp_address.key.fingerprint), file=results)
+ except LookupError:
+ print('Wrong token.', file=results)
+
+ return ContinueProcessing.no
+
+
+def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
+ if len(arguments) != 2:
+ print('Missing token', file=results)
+ return ContinueProcessing.no
+
+ email = _get_email(msg)
+ if not email:
+ print('No email to subscribe with.', file=results)
+ return ContinueProcessing.no
+
+ pgp_address = PGPAddress.for_email(email)
+ if pgp_address is None:
+ print('A pgp enabled address not found.', file=results)
+ return ContinueProcessing.no
+
+ wrapped = PGPWrapper(msg)
+ if wrapped.is_encrypted():
+ decrypted = wrapped.decrypt(pgp_list.key)
+ wrapped = PGPWrapper(decrypted)
+
+ if not wrapped.is_signed():
+ print('Message not signed, ignoring.', file=results)
+ return ContinueProcessing.no
+
+ if not wrapped.verifies(pgp_address.key):
+ print('Message failed to verify.', file=results)
+ return ContinueProcessing.no
+
+ token = arguments[1]
+
+ expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint,
+ token)
+ for sig_subject in wrapped.get_signed():
+ if expecting in sig_subject:
+ with transaction():
+ pgp_address.key_confirmed = True
+ ISubscriptionManager(mlist).confirm(token)
+ break
+ else:
+ print("Message doesn't contain the expected statement.", file=results)
+ return ContinueProcessing.no
+
+
+def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results):
+ # New public key in attachment, requires to be signed with current
+ # key
+ pass
+
+
+def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results):
+ # Current key revocation certificate in attachment, restarts the
+ # subscription process, or rather only it's key setup part.
+ pass
+
+
+def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results):
+ # List public key attached, signed by the users current key.
+ pass
+
+
+SUBCOMMANDS = {
+ 'set': _cmd_set,
+ 'confirm': _cmd_confirm,
+ 'change': _cmd_change,
+ 'revoke': _cmd_revoke,
+ 'sign': _cmd_sign
+}
+
+ARGUMENTS = '<' + '|'.join(SUBCOMMANDS.keys()) + '>'
+
+
@public
@implementer(IEmailCommand)
class KeyCommand:
"""The `key` command."""
name = 'key'
- argument_description = ''
+ argument_description = ARGUMENTS
short_description = ''
- description = ''
+ description = """\
+
+ """
def process(self, mlist, msg, msgdata, arguments, results):
"""See `IEmailCommand`."""
if len(arguments) == 0:
print('No sub-command specified,'
- ' must be one of .', file=results)
+ ' must be one of {}.'.format(ARGUMENTS), file=results)
+ return ContinueProcessing.no
+
+ if arguments[0] not in SUBCOMMANDS:
+ print('Wrong sub-command specified,'
+ ' must be one of {}.'.format(ARGUMENTS), file=results)
return ContinueProcessing.no
pgp_list = PGPMailingList.for_list(mlist)
if pgp_list is None:
- print('This mailing list doesnt have pgp enabled.', file=results)
- return ContinueProcessing.yes # XXX: What does this even mean?
-
- if arguments[0] == 'set':
- if len(arguments) != 2:
- print('Missing token', file=results)
- return ContinueProcessing.no
-
- wrapped = PGPWrapper(msg)
- if not wrapped.has_keys():
- print('No keys here?')
- return ContinueProcessing.no
-
- keys = list(wrapped.keys())
- if len(keys) != 1:
- print('More than one key! Which one is yours?')
- return ContinueProcessing.no
-
- with transaction() as t:
- pgp_address = PGPAddress(self.address)
- pgp_address.key = keys.pop()
- t.add(pgp_address)
-
- token = arguments[1]
- ISubscriptionManager(mlist).confirm(token)
- # XXX finish this
- elif arguments[0] == 'confirm':
- if len(arguments) != 2:
- print('Missing token', file=results)
- return ContinueProcessing.no
-
- display_name, email = parseaddr(msg['from'])
- # Address could be None or the empty string.
- if not email:
- email = msg.sender
- if not email:
- print('No email to subscribe with.', file=results)
- return ContinueProcessing.no
- um = getUtility(IUserManager)
-
- pgp_address = PGPAddress.for_address(um.get_address(email))
- if pgp_address is None:
- # TODO
- pass
-
- wrapped = PGPWrapper(msg)
- if wrapped.is_encrypted():
- decrypted = wrapped.decrypt(pgp_list.key)
- wrapped = PGPWrapper(decrypted)
-
- if not wrapped.is_signed():
- print('Message not signed, ignoring.', file=results)
- return ContinueProcessing.no
-
- if not wrapped.verifies(pgp_address.key):
- print('Message failed to verify.', file=results)
- return ContinueProcessing.no
-
- token = arguments[1]
-
- expecting = CONFIRM_REQUEST.format(pgp_address.key.pubkey,
- token)
- if wrapped.get_signed() == expecting:
- ISubscriptionManager(mlist).confirm(token)
- else:
- # TODO
- pass
-
- elif arguments[0] == 'change':
- # New public key in attachment, requires to be signed with current
- # key
- pass
- elif arguments[0] == 'revoke':
- # Current key revocation certificate in attachment, restarts the
- # subscription process, or rather only it's key setup part.
- pass
- elif arguments[0] == 'sign':
- # List public key attached, signed by the users current key.
- pass
- else:
- print('Wrong sub-command specified,'
- ' must be one of .', file=results)
+ print("This mailing list doesn't have pgp enabled.", file=results)
+ return ContinueProcessing.no
- return ContinueProcessing.no
+ command = SUBCOMMANDS[arguments[0]]
+ return command(pgp_list, mlist, msg, msgdata, arguments, results)
diff --git a/src/mailman_pgp/commands/tests/__init__.py b/src/mailman_pgp/commands/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
new file mode 100644
index 0000000..8b44011
--- /dev/null
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -0,0 +1,123 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see .
+
+""""""
+
+import unittest
+
+from mailman.app.lifecycle import create_list
+from mailman.email.message import Message
+from mailman.interfaces.subscriptions import ISubscriptionManager
+from mailman.interfaces.usermanager import IUserManager
+from mailman.runners.command import CommandRunner
+from mailman.testing.helpers import make_testable_runner, get_queue_messages
+from mailman.utilities.datetime import now
+from zope.component import getUtility
+
+from mailman_pgp.config import mm_config
+from mailman_pgp.database import transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.mime import MIMEWrapper
+from mailman_pgp.pgp.tests.base import load_key
+from mailman_pgp.pgp.wrapper import PGPWrapper
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.workflows.base import CONFIRM_REQUEST
+from mailman_pgp.workflows.subscription import OpenSubscriptionPolicy
+
+
+class TestPreSubscription(unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ self.mlist = create_list('test@example.com', style_name='pgp-default')
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.generate_key(True)
+
+ def test_set(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart)
+
+ get_queue_messages('virgin')
+
+ bart_key = load_key('rsa_1024.priv.asc')
+
+ set_message = Message()
+ set_message['From'] = 'bart@example.com'
+ set_message['To'] = 'test@example.com'
+ set_message['Subject'] = 'Re: key set {}'.format(token)
+ set_message.set_type('multipart/mixed')
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ pgp_address = PGPAddress.for_address(bart)
+ self.assertIsNotNone(pgp_address)
+ self.assertEqual(pgp_address.key.fingerprint, bart_key.fingerprint)
+ self.assertEqual(pgp_address.key_fingerprint, bart_key.fingerprint)
+ self.assertFalse(pgp_address.key_confirmed)
+
+ items = get_queue_messages('virgin', expected_count=2)
+ if items[0].msg['Subject'] == 'The results of your email commands':
+ confirm_request = items[1].msg
+ else:
+ confirm_request = items[0].msg
+
+ confirm_wrapped = PGPWrapper(confirm_request)
+ self.assertTrue(confirm_wrapped.is_encrypted())
+
+ def test_confirm(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ bart_key = load_key('rsa_1024.priv.asc')
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart, pubkey=bart_key.pubkey)
+
+ get_queue_messages('virgin')
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key =bart_key.pubkey
+ t.add(pgp_address)
+
+ confirm_message = Message()
+ confirm_message['From'] = 'bart@example.com'
+ confirm_message['To'] = 'test@example.com'
+ confirm_message['Subject'] = 'Re: key confirm {}'.format(token)
+ confirm_message.set_payload(
+ CONFIRM_REQUEST.format(bart_key.fingerprint, token))
+ wrapped_confirm_message = MIMEWrapper(confirm_message)
+ confirm_message = wrapped_confirm_message.sign(bart_key)
+
+ mm_config.switchboards['command'].enqueue(confirm_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ pgp_address = PGPAddress.for_address(bart)
+ self.assertTrue(pgp_address.key_confirmed)
+ self.assertTrue(self.mlist.is_subscribed(bart))
diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py
index 1d07486..6fc3050 100644
--- a/src/mailman_pgp/model/address.py
+++ b/src/mailman_pgp/model/address.py
@@ -22,7 +22,7 @@ from os.path import exists, isfile, join
from mailman.database.types import SAUnicode
from mailman.interfaces.usermanager import IUserManager
from pgpy import PGPKey
-from sqlalchemy import Column, Integer
+from sqlalchemy import Column, Integer, Boolean
from sqlalchemy.orm import reconstructor
from zope.component import getUtility
@@ -38,6 +38,7 @@ class PGPAddress(Base):
id = Column(Integer, primary_key=True)
email = Column(SAUnicode, index=True)
key_fingerprint = Column(SAUnicode)
+ key_confirmed = Column(Boolean)
def __init__(self, address):
super().__init__()
@@ -49,6 +50,7 @@ class PGPAddress(Base):
def _init(self):
self._address = None
self._key = None
+ self.key_confirmed = False
@property
def key(self):
@@ -119,3 +121,9 @@ class PGPAddress(Base):
if address is None:
return None
return PGPAddress.query().filter_by(email=address.email).first()
+
+ @staticmethod
+ def for_email(email):
+ if email is None:
+ return None
+ return PGPAddress.query().filter_by(email=email).first()
diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py
index 1ecbe76..975b8bd 100644
--- a/src/mailman_pgp/pgp/mime.py
+++ b/src/mailman_pgp/pgp/mime.py
@@ -87,7 +87,7 @@ class MIMEWrapper:
return self.is_signed()
def get_signed(self):
- yield self.msg.get_payload(0).as_string(0)
+ yield self.msg.get_payload(0).as_string()
def is_encrypted(self):
"""
@@ -145,6 +145,26 @@ class MIMEWrapper:
key, _ = PGPKey.from_blob(part.get_payload())
yield key
+ def attach_key(self, key):
+ """
+
+ :param key:
+ :type key: pgpy.PGPKey
+ :return:
+ """
+ filename = '0x' + key.fingerprint.keyid + '.asc'
+ key_part = MIMEApplication(_data=str(key),
+ _subtype=MIMEWrapper._keys_subtype,
+ _encoder=encode_7or8bit,
+ name=filename)
+ key_part.add_header('Content-Description',
+ 'OpenPGP key')
+ key_part.add_header('Content-Disposition', 'attachment',
+ filename=filename)
+ out = copy.deepcopy(self.msg)
+ out.attach(key_part)
+ return out
+
def verify(self, key):
"""
Verify the signature of this message with key.
--
cgit v1.2.3-70-g09d2
From 32cbc0b7da7e62fb4acdd4ce9e484a7109a32317 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Tue, 11 Jul 2017 21:03:01 +0200
Subject: qa.
---
src/mailman_pgp/commands/eml_key.py | 4 ++--
src/mailman_pgp/commands/tests/test_key.py | 4 ++--
src/mailman_pgp/model/address.py | 2 +-
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index e9b5e19..15db0b1 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -74,7 +74,8 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results):
try:
ISubscriptionManager(mlist).confirm(token)
print('Key succesfully set.', file=results)
- print('Key fingerprint: {}'.format(pgp_address.key.fingerprint), file=results)
+ print('Key fingerprint: {}'.format(pgp_address.key.fingerprint),
+ file=results)
except LookupError:
print('Wrong token.', file=results)
@@ -161,7 +162,6 @@ class KeyCommand:
argument_description = ARGUMENTS
short_description = ''
description = """\
-
"""
def process(self, mlist, msg, msgdata, arguments, results):
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
index 8b44011..95f9766 100644
--- a/src/mailman_pgp/commands/tests/test_key.py
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -24,7 +24,7 @@ from mailman.email.message import Message
from mailman.interfaces.subscriptions import ISubscriptionManager
from mailman.interfaces.usermanager import IUserManager
from mailman.runners.command import CommandRunner
-from mailman.testing.helpers import make_testable_runner, get_queue_messages
+from mailman.testing.helpers import get_queue_messages, make_testable_runner
from mailman.utilities.datetime import now
from zope.component import getUtility
@@ -102,7 +102,7 @@ class TestPreSubscription(unittest.TestCase):
with transaction() as t:
pgp_address = PGPAddress(bart)
- pgp_address.key =bart_key.pubkey
+ pgp_address.key = bart_key.pubkey
t.add(pgp_address)
confirm_message = Message()
diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py
index 6fc3050..f102bf7 100644
--- a/src/mailman_pgp/model/address.py
+++ b/src/mailman_pgp/model/address.py
@@ -22,7 +22,7 @@ from os.path import exists, isfile, join
from mailman.database.types import SAUnicode
from mailman.interfaces.usermanager import IUserManager
from pgpy import PGPKey
-from sqlalchemy import Column, Integer, Boolean
+from sqlalchemy import Boolean, Column, Integer
from sqlalchemy.orm import reconstructor
from zope.component import getUtility
--
cgit v1.2.3-70-g09d2
From beff82c480fa4f9acaede60a5f789e880721cc81 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Tue, 11 Jul 2017 21:31:40 +0200
Subject: Don't recheck addresses with keys and key confirmations.
---
src/mailman_pgp/commands/eml_key.py | 5 ++++-
src/mailman_pgp/workflows/base.py | 18 ++++++++++++++----
2 files changed, 18 insertions(+), 5 deletions(-)
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index 15db0b1..fbae25e 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -65,8 +65,11 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results):
print('No adddress to subscribe with.', file=results)
return ContinueProcessing.no
+
with transaction() as t:
- pgp_address = PGPAddress(address)
+ pgp_address = PGPAddress.for_address(address)
+ if pgp_address is None:
+ pgp_address = PGPAddress(address)
pgp_address.key = keys.pop()
t.add(pgp_address)
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index 4ca1525..ace8f91 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -65,11 +65,21 @@ class PubkeyMixin:
def _step_pubkey_checks(self):
self.push('restore_subscriber')
- if not self.pubkey:
- self.push('send_key_request')
+
+ pgp_address = PGPAddress.for_address(self.address)
+
+ if pgp_address is not None:
+ if not pgp_address.key:
+ self.push('send_key_request')
+ else:
+ if not pgp_address.key_confirmed:
+ self.push('send_key_confirm_request')
else:
- if not self.pubkey_confirmed:
- self.push('send_key_confirm_request')
+ if not self.pubkey:
+ self.push('send_key_request')
+ else:
+ if not self.pubkey_confirmed:
+ self.push('send_key_confirm_request')
def _step_send_key_request(self):
self._set_token(TokenOwner.subscriber)
--
cgit v1.2.3-70-g09d2
From 4bab3532e38f6fd4826e0af2f96b6060286e3ed7 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Tue, 11 Jul 2017 22:07:06 +0200
Subject: Some more tests for the key command.
---
src/mailman_pgp/commands/eml_key.py | 2 +-
src/mailman_pgp/commands/tests/test_key.py | 153 +++++++++++++++++++++++++++++
2 files changed, 154 insertions(+), 1 deletion(-)
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index fbae25e..e2a2443 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -87,7 +87,7 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results):
def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
if len(arguments) != 2:
- print('Missing token', file=results)
+ print('Missing token.', file=results)
return ContinueProcessing.no
email = _get_email(msg)
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
index 95f9766..b3d8c95 100644
--- a/src/mailman_pgp/commands/tests/test_key.py
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -40,6 +40,59 @@ from mailman_pgp.workflows.base import CONFIRM_REQUEST
from mailman_pgp.workflows.subscription import OpenSubscriptionPolicy
+class TestPreDispatch(unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ self.mlist = create_list('test@example.com')
+
+ def test_no_arguments(self):
+ message = Message()
+ message['From'] = 'bart@example.com'
+ message['To'] = 'test@example.com'
+ message['Subject'] = 'key'
+ message.set_payload('')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No sub-command specified', results_msg.get_payload())
+
+ def test_wrong_subcommand(self):
+ message = Message()
+ message['From'] = 'bart@example.com'
+ message['To'] = 'test@example.com'
+ message['Subject'] = 'key wroooooong'
+ message.set_payload('')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Wrong sub-command specified', results_msg.get_payload())
+
+ def test_no_pgp_list(self):
+ message = Message()
+ message['From'] = 'bart@example.com'
+ message['To'] = 'test@example.com'
+ message['Subject'] = 'key set'
+ message.set_payload('')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn("This mailing list doesn't have pgp enabled.",
+ results_msg.get_payload())
+
+
class TestPreSubscription(unittest.TestCase):
layer = PGPConfigLayer
@@ -87,6 +140,60 @@ class TestPreSubscription(unittest.TestCase):
confirm_wrapped = PGPWrapper(confirm_request)
self.assertTrue(confirm_wrapped.is_encrypted())
+ def test_set_no_token(self):
+ message = Message()
+ message['From'] = 'bart@example.com'
+ message['To'] = 'test@example.com'
+ message['Subject'] = 'key set'
+ message.set_payload('')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Missing token.', results_msg.get_payload())
+
+ def test_set_no_key(self):
+ message = Message()
+ message['From'] = 'bart@example.com'
+ message['To'] = 'test@example.com'
+ message['Subject'] = 'key set token'
+ message.set_payload('')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No keys attached? Send a key.',
+ results_msg.get_payload())
+
+ def test_set_multiple_keys(self):
+ bart_key = load_key('rsa_1024.priv.asc')
+ anne_key = load_key('ecc_p256.priv.asc')
+
+ set_message = Message()
+ set_message['From'] = 'bart@example.com'
+ set_message['To'] = 'test@example.com'
+ set_message['Subject'] = 'Re: key set token'
+ set_message.set_type('multipart/mixed')
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(bart_key.pubkey)
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(anne_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('More than one key! Send only one key.',
+ results_msg.get_payload())
+
def test_confirm(self):
self.mlist.subscription_policy = OpenSubscriptionPolicy
bart = getUtility(IUserManager).create_address('bart@example.com',
@@ -121,3 +228,49 @@ class TestPreSubscription(unittest.TestCase):
pgp_address = PGPAddress.for_address(bart)
self.assertTrue(pgp_address.key_confirmed)
self.assertTrue(self.mlist.is_subscribed(bart))
+
+ def test_confirm_no_token(self):
+ message = Message()
+ message['From'] = 'bart@example.com'
+ message['To'] = 'test@example.com'
+ message['Subject'] = 'key confirm'
+ message.set_payload('')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Missing token.', results_msg.get_payload())
+
+ def test_confirm_no_email(self):
+ message = Message()
+ message['From'] = ''
+ message['To'] = 'test@example.com'
+ message['Subject'] = 'key confirm token'
+ message.set_payload('')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No email to subscribe with.', results_msg.get_payload())
+
+ def test_confirm_no_pgp_address(self):
+ message = Message()
+ message['From'] = 'bart@example.com'
+ message['To'] = 'test@example.com'
+ message['Subject'] = 'key confirm token'
+ message.set_payload('')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('A pgp enabled address not found.',
+ results_msg.get_payload())
--
cgit v1.2.3-70-g09d2
From 44bde427fdcc78344892f3413597c7be66eca8e9 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Tue, 11 Jul 2017 22:38:20 +0200
Subject: Add some more REST to addresses.
---
src/mailman_pgp/rest/addresses.py | 37 ++++++++++++++---
src/mailman_pgp/rest/lists.py | 2 +-
src/mailman_pgp/rest/tests/test_addresses.py | 60 ++++++++++++++++++++++++++++
src/mailman_pgp/rest/tests/test_lists.py | 2 +-
4 files changed, 93 insertions(+), 8 deletions(-)
create mode 100644 src/mailman_pgp/rest/tests/test_addresses.py
diff --git a/src/mailman_pgp/rest/addresses.py b/src/mailman_pgp/rest/addresses.py
index 4f7a04d..93be6d3 100644
--- a/src/mailman_pgp/rest/addresses.py
+++ b/src/mailman_pgp/rest/addresses.py
@@ -17,19 +17,44 @@
""""""
-from mailman.rest.helpers import CollectionMixin
+from mailman.rest.helpers import CollectionMixin, okay, etag, not_found
from public.public import public
+from mailman_pgp.config import config
+from mailman_pgp.model.address import PGPAddress
+
class _EncryptedBase(CollectionMixin):
- pass
+ def _resource_as_dict(self, address):
+ """See `CollectionMixin`."""
+ return dict(email=address.email,
+ key_fingerprint=address.key_fingerprint,
+ key_confirmed=address.key_confirmed,
+ self_link=self.api.path_to(
+ '/plugins/{}/addresses/{}'.format(config.name,
+ address.email)
+ ))
+
+ def _get_collection(self, request):
+ """See `CollectionMixin`."""
+ return PGPAddress.query().all()
@public
-class AllAddresses:
- pass
+class AllAddresses(_EncryptedBase):
+ def on_get(self, request, response):
+ """/addresses"""
+ resource = self._make_collection(request)
+ return okay(response, etag(resource))
@public
-class AnAddress:
- pass
+class AnAddress(_EncryptedBase):
+ def __init__(self, email):
+ self._address = PGPAddress.for_email(email)
+
+ def on_get(self, request, response):
+ if self._address is None:
+ return not_found(response)
+ else:
+ okay(response, self._resource_as_json(self._address))
diff --git a/src/mailman_pgp/rest/lists.py b/src/mailman_pgp/rest/lists.py
index 9141bce..37b8d28 100644
--- a/src/mailman_pgp/rest/lists.py
+++ b/src/mailman_pgp/rest/lists.py
@@ -73,7 +73,7 @@ class AnEncryptedList(_EncryptedBase):
okay(response, self._resource_as_json(self._mlist))
@child()
- def key(self, context, segments):
+ def pubkey(self, context, segments):
return AListPubkey(self._mlist), []
diff --git a/src/mailman_pgp/rest/tests/test_addresses.py b/src/mailman_pgp/rest/tests/test_addresses.py
new file mode 100644
index 0000000..1730618
--- /dev/null
+++ b/src/mailman_pgp/rest/tests/test_addresses.py
@@ -0,0 +1,60 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see .
+
+""""""
+import unittest
+from urllib.error import HTTPError
+
+from mailman.interfaces.usermanager import IUserManager
+from mailman.testing.helpers import call_api
+from zope.component import getUtility
+
+from mailman_pgp.database import transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.testing.layers import PGPRESTLayer
+
+
+class TestAddresses(unittest.TestCase):
+ layer = PGPRESTLayer
+
+ def setUp(self):
+ self.mm_address = getUtility(IUserManager).create_address(
+ 'anne@example.com')
+ with transaction() as t:
+ self.address = PGPAddress(self.mm_address)
+ t.add(self.address)
+
+ def test_missing_address(self):
+ with self.assertRaises(HTTPError) as cm:
+ call_api('http://localhost:9001/3.1/plugins/pgp/addresses/'
+ 'bart@example.com')
+ self.assertEqual(cm.exception.code, 404)
+
+ def test_all_addresses(self):
+ json, response = call_api(
+ 'http://localhost:9001/3.1/plugins/pgp/addresses/')
+ self.assertEqual(json['total_size'], 1)
+ self.assertEqual(len(json['entries']), 1)
+ addresses = json['entries']
+ address = addresses[0]
+ self.assertEqual(address['email'], self.address.email)
+
+ def test_get_address(self):
+ json, response = call_api(
+ 'http://localhost:9001/3.1/plugins/pgp/addresses/'
+ 'anne@example.com')
+ self.assertEqual(json['email'], self.address.email)
diff --git a/src/mailman_pgp/rest/tests/test_lists.py b/src/mailman_pgp/rest/tests/test_lists.py
index a674cc9..c5c9854 100644
--- a/src/mailman_pgp/rest/tests/test_lists.py
+++ b/src/mailman_pgp/rest/tests/test_lists.py
@@ -68,7 +68,7 @@ class TestLists(TestCase):
json, response = call_api(
'http://localhost:9001/3.1/plugins/pgp/lists/'
- 'another.example.com/key')
+ 'another.example.com/pubkey')
json.pop('http_etag')
self.assertEqual(len(json.keys()), 2)
--
cgit v1.2.3-70-g09d2
From 329532bc05f83edb4cd23e2ae82da777511c9857 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Tue, 11 Jul 2017 23:00:53 +0200
Subject: Add more tests for workflows.
---
src/mailman_pgp/workflows/base.py | 1 -
src/mailman_pgp/workflows/tests/test_base.py | 17 +++++++++++++++++
2 files changed, 17 insertions(+), 1 deletion(-)
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index ace8f91..1e6b867 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -100,7 +100,6 @@ class PubkeyMixin:
# The workflow was confirmed but we still dont have an address
# or the pubkey. So resend request and wait.
self.push('send_key_request')
- return
else:
self.pubkey = pgp_address.key
if not self.pubkey_confirmed:
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
index c99c6d6..4f39296 100644
--- a/src/mailman_pgp/workflows/tests/test_base.py
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -189,3 +189,20 @@ class TestPubkeyMixin(unittest.TestCase):
with patch.object(receive_workflow, '_step_do_subscription') as step:
next(receive_workflow)
step.assert_called_once_with()
+
+ def test_exisitng_pgp_address(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True)
+
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ workflow.run_thru('pubkey_checks')
+ next(workflow)
+ with patch.object(workflow, '_step_do_subscription') as step:
+ next(workflow)
+ step.assert_called_once_with()
--
cgit v1.2.3-70-g09d2
From 65b25b13ba5a3831c43204da9358eb42b1b373af Mon Sep 17 00:00:00 2001
From: J08nY
Date: Tue, 11 Jul 2017 23:16:23 +0200
Subject: Change default subscription policies to pgp enabled ones for pgp
lists.
---
src/mailman_pgp/styles/base.py | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/src/mailman_pgp/styles/base.py b/src/mailman_pgp/styles/base.py
index 6b5271b..a7c3366 100644
--- a/src/mailman_pgp/styles/base.py
+++ b/src/mailman_pgp/styles/base.py
@@ -19,7 +19,7 @@
from lazr.config import as_boolean
from public import public
-from mailman_pgp.config import config
+from mailman_pgp.config import config, mm_config
from mailman_pgp.database import transaction
from mailman_pgp.model.list import PGPMailingList
@@ -32,6 +32,11 @@ class PGPStyle:
"""
mailing_list.posting_chain = 'pgp-posting-chain'
+ old_policy = mailing_list.subscription_policy.name
+ new_policy_name = 'pgp-' + old_policy[4:]
+ if new_policy_name in mm_config.workflows:
+ mailing_list.subscription_policy = new_policy_name
+
pgp_list = PGPMailingList.for_list(mailing_list)
if pgp_list:
return
--
cgit v1.2.3-70-g09d2
From 2fd2f28f4389e3d0cad4fdc9c6067b5baff291e6 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Wed, 12 Jul 2017 02:41:48 +0200
Subject: Some more tests for key command. PEP8.
---
src/mailman_pgp/commands/eml_key.py | 1 -
src/mailman_pgp/commands/tests/test_key.py | 211 +++++++++++++++++++++--------
src/mailman_pgp/rest/addresses.py | 2 +-
3 files changed, 152 insertions(+), 62 deletions(-)
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index e2a2443..a2fb4d5 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -65,7 +65,6 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results):
print('No adddress to subscribe with.', file=results)
return ContinueProcessing.no
-
with transaction() as t:
pgp_address = PGPAddress.for_address(address)
if pgp_address is None:
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
index b3d8c95..48ca3c9 100644
--- a/src/mailman_pgp/commands/tests/test_key.py
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -40,6 +40,24 @@ from mailman_pgp.workflows.base import CONFIRM_REQUEST
from mailman_pgp.workflows.subscription import OpenSubscriptionPolicy
+def _create_plain(from_hdr, to_hdr, subject_hdr, payload):
+ message = Message()
+ message['From'] = from_hdr
+ message['To'] = to_hdr
+ message['Subject'] = subject_hdr
+ message.set_payload(payload)
+ return message
+
+
+def _create_mixed(from_hdr, to_hdr, subject_hdr):
+ message = Message()
+ message['From'] = from_hdr
+ message['To'] = to_hdr
+ message['Subject'] = subject_hdr
+ message.set_type('multipart/mixed')
+ return message
+
+
class TestPreDispatch(unittest.TestCase):
layer = PGPConfigLayer
@@ -47,11 +65,8 @@ class TestPreDispatch(unittest.TestCase):
self.mlist = create_list('test@example.com')
def test_no_arguments(self):
- message = Message()
- message['From'] = 'bart@example.com'
- message['To'] = 'test@example.com'
- message['Subject'] = 'key'
- message.set_payload('')
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key', '')
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -62,11 +77,8 @@ class TestPreDispatch(unittest.TestCase):
self.assertIn('No sub-command specified', results_msg.get_payload())
def test_wrong_subcommand(self):
- message = Message()
- message['From'] = 'bart@example.com'
- message['To'] = 'test@example.com'
- message['Subject'] = 'key wroooooong'
- message.set_payload('')
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key wrooooooong', '')
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -77,11 +89,8 @@ class TestPreDispatch(unittest.TestCase):
self.assertIn('Wrong sub-command specified', results_msg.get_payload())
def test_no_pgp_list(self):
- message = Message()
- message['From'] = 'bart@example.com'
- message['To'] = 'test@example.com'
- message['Subject'] = 'key set'
- message.set_payload('')
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key set', '')
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -99,7 +108,7 @@ class TestPreSubscription(unittest.TestCase):
def setUp(self):
self.mlist = create_list('test@example.com', style_name='pgp-default')
self.pgp_list = PGPMailingList.for_list(self.mlist)
- self.pgp_list.generate_key(True)
+ self.pgp_list.key = load_key('ecc_p256.priv.asc')
def test_set(self):
self.mlist.subscription_policy = OpenSubscriptionPolicy
@@ -113,11 +122,8 @@ class TestPreSubscription(unittest.TestCase):
bart_key = load_key('rsa_1024.priv.asc')
- set_message = Message()
- set_message['From'] = 'bart@example.com'
- set_message['To'] = 'test@example.com'
- set_message['Subject'] = 'Re: key set {}'.format(token)
- set_message.set_type('multipart/mixed')
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'Re: key set {}'.format(token))
wrapped_set_message = MIMEWrapper(set_message)
set_message = wrapped_set_message.attach_key(bart_key.pubkey)
@@ -133,19 +139,22 @@ class TestPreSubscription(unittest.TestCase):
items = get_queue_messages('virgin', expected_count=2)
if items[0].msg['Subject'] == 'The results of your email commands':
+ results = items[0].msg
confirm_request = items[1].msg
else:
+ results = items[1].msg
confirm_request = items[0].msg
+ self.assertIn('Key succesfully set.', results.get_payload())
+ self.assertIn('Key fingerprint: {}'.format(bart_key.fingerprint),
+ results.get_payload())
+
confirm_wrapped = PGPWrapper(confirm_request)
self.assertTrue(confirm_wrapped.is_encrypted())
def test_set_no_token(self):
- message = Message()
- message['From'] = 'bart@example.com'
- message['To'] = 'test@example.com'
- message['Subject'] = 'key set'
- message.set_payload('')
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key set', '')
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -156,11 +165,8 @@ class TestPreSubscription(unittest.TestCase):
self.assertIn('Missing token.', results_msg.get_payload())
def test_set_no_key(self):
- message = Message()
- message['From'] = 'bart@example.com'
- message['To'] = 'test@example.com'
- message['Subject'] = 'key set token'
- message.set_payload('')
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key set token', '')
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -175,11 +181,8 @@ class TestPreSubscription(unittest.TestCase):
bart_key = load_key('rsa_1024.priv.asc')
anne_key = load_key('ecc_p256.priv.asc')
- set_message = Message()
- set_message['From'] = 'bart@example.com'
- set_message['To'] = 'test@example.com'
- set_message['Subject'] = 'Re: key set token'
- set_message.set_type('multipart/mixed')
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'Re: key set token')
wrapped_set_message = MIMEWrapper(set_message)
set_message = wrapped_set_message.attach_key(bart_key.pubkey)
wrapped_set_message = MIMEWrapper(set_message)
@@ -194,6 +197,38 @@ class TestPreSubscription(unittest.TestCase):
self.assertIn('More than one key! Send only one key.',
results_msg.get_payload())
+ def test_set_no_email(self):
+ bart_key = load_key('rsa_1024.priv.asc')
+
+ message = _create_mixed('', 'test@example.com', 'key set token')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_key(bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No email to subscribe with.', results_msg.get_payload())
+
+ def test_set_no_address(self):
+ bart_key = load_key('rsa_1024.priv.asc')
+
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key set token')
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No adddress to subscribe with.',
+ results_msg.get_payload())
+
def test_confirm(self):
self.mlist.subscription_policy = OpenSubscriptionPolicy
bart = getUtility(IUserManager).create_address('bart@example.com',
@@ -212,16 +247,48 @@ class TestPreSubscription(unittest.TestCase):
pgp_address.key = bart_key.pubkey
t.add(pgp_address)
- confirm_message = Message()
- confirm_message['From'] = 'bart@example.com'
- confirm_message['To'] = 'test@example.com'
- confirm_message['Subject'] = 'Re: key confirm {}'.format(token)
- confirm_message.set_payload(
- CONFIRM_REQUEST.format(bart_key.fingerprint, token))
- wrapped_confirm_message = MIMEWrapper(confirm_message)
- confirm_message = wrapped_confirm_message.sign(bart_key)
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm {}'.format(token),
+ CONFIRM_REQUEST.format(bart_key.fingerprint,
+ token))
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign(bart_key)
- mm_config.switchboards['command'].enqueue(confirm_message,
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ pgp_address = PGPAddress.for_address(bart)
+ self.assertTrue(pgp_address.key_confirmed)
+ self.assertTrue(self.mlist.is_subscribed(bart))
+
+ def test_confirm_encrypted(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ bart_key = load_key('rsa_1024.priv.asc')
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart, pubkey=bart_key.pubkey)
+
+ get_queue_messages('virgin')
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = bart_key.pubkey
+ t.add(pgp_address)
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm {}'.format(token),
+ CONFIRM_REQUEST.format(bart_key.fingerprint,
+ token))
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign_encrypt(bart_key, self.pgp_list.pubkey,
+ bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
make_testable_runner(CommandRunner, 'command').run()
@@ -230,11 +297,8 @@ class TestPreSubscription(unittest.TestCase):
self.assertTrue(self.mlist.is_subscribed(bart))
def test_confirm_no_token(self):
- message = Message()
- message['From'] = 'bart@example.com'
- message['To'] = 'test@example.com'
- message['Subject'] = 'key confirm'
- message.set_payload('')
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key confirm', '')
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -245,11 +309,8 @@ class TestPreSubscription(unittest.TestCase):
self.assertIn('Missing token.', results_msg.get_payload())
def test_confirm_no_email(self):
- message = Message()
- message['From'] = ''
- message['To'] = 'test@example.com'
- message['Subject'] = 'key confirm token'
- message.set_payload('')
+ message = _create_plain('', 'test@example.com',
+ 'key confirm token', '')
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -260,11 +321,8 @@ class TestPreSubscription(unittest.TestCase):
self.assertIn('No email to subscribe with.', results_msg.get_payload())
def test_confirm_no_pgp_address(self):
- message = Message()
- message['From'] = 'bart@example.com'
- message['To'] = 'test@example.com'
- message['Subject'] = 'key confirm token'
- message.set_payload('')
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key confirm token', '')
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -274,3 +332,36 @@ class TestPreSubscription(unittest.TestCase):
self.assertIn('A pgp enabled address not found.',
results_msg.get_payload())
+
+ def test_confirm_not_signed(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ bart_key = load_key('rsa_1024.priv.asc')
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart, pubkey=bart_key.pubkey)
+
+ get_queue_messages('virgin')
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = bart_key.pubkey
+ t.add(pgp_address)
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm {}'.format(token),
+ CONFIRM_REQUEST.format(bart_key.fingerprint,
+ token))
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Message not signed, ignoring.',
+ results_msg.get_payload())
diff --git a/src/mailman_pgp/rest/addresses.py b/src/mailman_pgp/rest/addresses.py
index 93be6d3..6d99a04 100644
--- a/src/mailman_pgp/rest/addresses.py
+++ b/src/mailman_pgp/rest/addresses.py
@@ -17,7 +17,7 @@
""""""
-from mailman.rest.helpers import CollectionMixin, okay, etag, not_found
+from mailman.rest.helpers import CollectionMixin, etag, not_found, okay
from public.public import public
from mailman_pgp.config import config
--
cgit v1.2.3-70-g09d2
From d01183610f3df226da76b3f853666e520cf22461 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Wed, 12 Jul 2017 19:48:22 +0200
Subject: Some more workflow tests.
---
src/mailman_pgp/workflows/tests/test_base.py | 29 ++++++++++++++++++++++++++++
1 file changed, 29 insertions(+)
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
index 4f39296..5b07e5b 100644
--- a/src/mailman_pgp/workflows/tests/test_base.py
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -206,3 +206,32 @@ class TestPubkeyMixin(unittest.TestCase):
with patch.object(workflow, '_step_do_subscription') as step:
next(workflow)
step.assert_called_once_with()
+
+ def test_exisitng_pgp_address_not_confirmed(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True)
+
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ t.add(pgp_address)
+
+ workflow.run_thru('pubkey_checks')
+ with patch.object(workflow, '_step_send_key_confirm_request') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
+ def test_exisitng_pgp_address_no_key(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True)
+
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ t.add(pgp_address)
+
+ workflow.run_thru('pubkey_checks')
+ with patch.object(workflow, '_step_send_key_request') as step:
+ next(workflow)
+ step.assert_called_once_with()
--
cgit v1.2.3-70-g09d2
From ce02d5e98feb6dab1a4a8189d109495a41718260 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Wed, 12 Jul 2017 22:52:57 +0200
Subject: Use restore_subscriber.
---
src/mailman_pgp/workflows/base.py | 13 ++++---------
src/mailman_pgp/workflows/tests/test_base.py | 4 ----
2 files changed, 4 insertions(+), 13 deletions(-)
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index 1e6b867..3c276cf 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -64,8 +64,6 @@ class PubkeyMixin:
self.pubkey = None
def _step_pubkey_checks(self):
- self.push('restore_subscriber')
-
pgp_address = PGPAddress.for_address(self.address)
if pgp_address is not None:
@@ -95,6 +93,9 @@ class PubkeyMixin:
raise StopIteration
def _step_receive_key(self):
+ self._restore_subscriber()
+ self._set_token(TokenOwner.no_one)
+
pgp_address = PGPAddress.for_address(self.address)
if pgp_address is None or pgp_address.key is None:
# The workflow was confirmed but we still dont have an address
@@ -126,11 +127,5 @@ class PubkeyMixin:
raise StopIteration
def _step_receive_key_confirmation(self):
+ self._restore_subscriber()
self._set_token(TokenOwner.no_one)
-
- def _step_restore_subscriber(self):
- if self.which is WhichSubscriber.address:
- self.subscriber = self.address
- else:
- assert self.which is WhichSubscriber.user
- self.subscriber = self.user
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
index 5b07e5b..ef3a009 100644
--- a/src/mailman_pgp/workflows/tests/test_base.py
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -88,7 +88,6 @@ class TestPubkeyMixin(unittest.TestCase):
pubkey=self.sender_key.pubkey,
pubkey_pre_confirmed=True)
workflow.run_thru('pubkey_checks')
- next(workflow)
with patch.object(workflow, '_step_do_subscription') as step:
next(workflow)
step.assert_called_once_with()
@@ -148,7 +147,6 @@ class TestPubkeyMixin(unittest.TestCase):
receive_workflow.token = workflow.token
receive_workflow.restore()
receive_workflow.run_thru('receive_key')
- next(receive_workflow)
with patch.object(receive_workflow, '_step_do_subscription') as step:
next(receive_workflow)
step.assert_called_once_with()
@@ -185,7 +183,6 @@ class TestPubkeyMixin(unittest.TestCase):
receive_workflow.token = workflow.token
receive_workflow.restore()
receive_workflow.run_thru('receive_key_confirmation')
- next(receive_workflow)
with patch.object(receive_workflow, '_step_do_subscription') as step:
next(receive_workflow)
step.assert_called_once_with()
@@ -202,7 +199,6 @@ class TestPubkeyMixin(unittest.TestCase):
t.add(pgp_address)
workflow.run_thru('pubkey_checks')
- next(workflow)
with patch.object(workflow, '_step_do_subscription') as step:
next(workflow)
step.assert_called_once_with()
--
cgit v1.2.3-70-g09d2
From a4e412d40162e35c54704793938e1a5cbf196086 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Thu, 13 Jul 2017 00:04:53 +0200
Subject: Split PubkeyMixin into two.
---
src/mailman_pgp/workflows/base.py | 21 ++++---
src/mailman_pgp/workflows/subscription.py | 33 ++++++-----
src/mailman_pgp/workflows/tests/test_base.py | 85 ++++++++++++++++++----------
3 files changed, 90 insertions(+), 49 deletions(-)
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index 3c276cf..20db5f9 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -45,7 +45,7 @@ Token: {}
"""
-class PubkeyMixin:
+class SetPubkeyMixin:
def __init__(self, pubkey=None, pre_confirmed=False):
self.pubkey = pubkey
self.pubkey_confirmed = pre_confirmed
@@ -69,15 +69,9 @@ class PubkeyMixin:
if pgp_address is not None:
if not pgp_address.key:
self.push('send_key_request')
- else:
- if not pgp_address.key_confirmed:
- self.push('send_key_confirm_request')
else:
if not self.pubkey:
self.push('send_key_request')
- else:
- if not self.pubkey_confirmed:
- self.push('send_key_confirm_request')
def _step_send_key_request(self):
self._set_token(TokenOwner.subscriber)
@@ -103,6 +97,19 @@ class PubkeyMixin:
self.push('send_key_request')
else:
self.pubkey = pgp_address.key
+
+
+class ConfirmPubkeyMixin:
+ def __init__(self, pre_confirmed=False):
+ self.pubkey_confirmed = pre_confirmed
+
+ def _step_pubkey_confirmation(self):
+ pgp_address = PGPAddress.for_address(self.address)
+
+ if pgp_address is not None:
+ if not pgp_address.key_confirmed and not self.pubkey_confirmed:
+ self.push('send_key_confirm_request')
+ else:
if not self.pubkey_confirmed:
self.push('send_key_confirm_request')
diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py
index a7565f3..c4138e6 100644
--- a/src/mailman_pgp/workflows/subscription.py
+++ b/src/mailman_pgp/workflows/subscription.py
@@ -24,12 +24,13 @@ from mailman.workflows.common import (ConfirmationMixin, ModerationMixin,
from public import public
from zope.interface import implementer
-from mailman_pgp.workflows.base import PubkeyMixin
+from mailman_pgp.workflows.base import ConfirmPubkeyMixin, SetPubkeyMixin
@public
@implementer(ISubscriptionWorkflow)
-class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, PubkeyMixin):
+class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin,
+ SetPubkeyMixin, ConfirmPubkeyMixin):
""""""
name = 'pgp-policy-open'
@@ -51,11 +52,12 @@ class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, PubkeyMixin):
pubkey_pre_confirmed=False):
SubscriptionBase.__init__(self, mlist, subscriber)
VerificationMixin.__init__(self, pre_verified=pre_verified)
- PubkeyMixin.__init__(self, pubkey=pubkey,
- pre_confirmed=pubkey_pre_confirmed)
+ SetPubkeyMixin.__init__(self, pubkey=pubkey)
+ ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
def _step_prepare(self):
self.push('do_subscription')
+ self.push('pubkey_confirmation')
self.push('pubkey_checks')
self.push('verification_checks')
self.push('sanity_checks')
@@ -64,7 +66,8 @@ class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, PubkeyMixin):
@public
@implementer(ISubscriptionWorkflow)
class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
- ConfirmationMixin, PubkeyMixin):
+ ConfirmationMixin, SetPubkeyMixin,
+ ConfirmPubkeyMixin):
""""""
name = 'pgp-policy-confirm'
@@ -88,11 +91,12 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
SubscriptionBase.__init__(self, mlist, subscriber)
VerificationMixin.__init__(self, pre_verified=pre_verified)
ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed)
- PubkeyMixin.__init__(self, pubkey=pubkey,
- pre_confirmed=pubkey_pre_confirmed)
+ SetPubkeyMixin.__init__(self, pubkey=pubkey)
+ ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
def _step_prepare(self):
self.push('do_subscription')
+ self.push('pubkey_confirmation')
self.push('pubkey_checks')
self.push('confirmation_checks')
self.push('verification_checks')
@@ -102,7 +106,8 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
@public
@implementer(ISubscriptionWorkflow)
class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
- ModerationMixin, PubkeyMixin):
+ ModerationMixin, SetPubkeyMixin,
+ ConfirmPubkeyMixin):
""""""
name = 'pgp-policy-moderate'
@@ -126,12 +131,13 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
SubscriptionBase.__init__(self, mlist, subscriber)
VerificationMixin.__init__(self, pre_verified=pre_verified)
ModerationMixin.__init__(self, pre_approved=pre_approved)
- PubkeyMixin.__init__(self, pubkey=pubkey,
- pre_confirmed=pubkey_pre_confirmed)
+ SetPubkeyMixin.__init__(self, pubkey=pubkey)
+ ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
def _step_prepare(self):
self.push('do_subscription')
self.push('moderation_checks')
+ self.push('pubkey_confirmation')
self.push('pubkey_checks')
self.push('verification_checks')
self.push('sanity_checks')
@@ -141,7 +147,7 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
@implementer(ISubscriptionWorkflow)
class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
ConfirmationMixin, ModerationMixin,
- PubkeyMixin):
+ SetPubkeyMixin, ConfirmPubkeyMixin):
""""""
name = 'pgp-policy-confirm-moderate'
@@ -167,12 +173,13 @@ class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
VerificationMixin.__init__(self, pre_verified=pre_verified)
ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed)
ModerationMixin.__init__(self, pre_approved=pre_approved)
- PubkeyMixin.__init__(self, pubkey=pubkey,
- pre_confirmed=pubkey_pre_confirmed)
+ SetPubkeyMixin.__init__(self, pubkey=pubkey)
+ ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
def _step_prepare(self):
self.push('do_subscription')
self.push('moderation_checks')
+ self.push('pubkey_confirmation')
self.push('pubkey_checks')
self.push('confirmation_checks')
self.push('verification_checks')
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
index ef3a009..913ab67 100644
--- a/src/mailman_pgp/workflows/tests/test_base.py
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -37,8 +37,7 @@ from mailman_pgp.workflows.base import KEY_REQUEST
from mailman_pgp.workflows.subscription import ConfirmSubscriptionPolicy
-class TestPubkeyMixin(unittest.TestCase):
- layer = PGPConfigLayer
+class PubkeyMixinSetup():
def setUp(self):
with mm_transaction():
@@ -55,19 +54,9 @@ class TestPubkeyMixin(unittest.TestCase):
self.sender_key = load_key('rsa_1024.priv.asc')
self.sender = self.um.create_address('rsa-1024b@example.org')
- def test_pended_data(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
- with suppress(StopIteration):
- workflow.run_thru('send_key_request')
- self.assertIsNotNone(workflow.token)
- pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
- self.assertEqual(pendable['list_id'], 'test.example.com')
- self.assertEqual(pendable['email'], 'rsa-1024b@example.org')
- self.assertEqual(pendable['display_name'], '')
- self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
- self.assertEqual(pendable['token_owner'], 'subscriber')
+
+class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
+ layer = PGPConfigLayer
def test_key_request_sent(self):
workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
@@ -81,17 +70,6 @@ class TestPubkeyMixin(unittest.TestCase):
self.assertEqual(message['Subject'], 'key set {}'.format(token))
self.assertEqual(message.get_payload(), KEY_REQUEST)
- def test_key_request_pubkey_set(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True,
- pubkey=self.sender_key.pubkey,
- pubkey_pre_confirmed=True)
- workflow.run_thru('pubkey_checks')
- with patch.object(workflow, '_step_do_subscription') as step:
- next(workflow)
- step.assert_called_once_with()
-
def test_receive_key(self):
workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
pre_verified=True,
@@ -111,6 +89,7 @@ class TestPubkeyMixin(unittest.TestCase):
self.assertEqual(receive_workflow.pubkey.fingerprint,
self.sender_key.pubkey.fingerprint)
+ receive_workflow.run_thru('pubkey_confirmation')
with patch.object(receive_workflow,
'_step_send_key_confirm_request') as step:
next(receive_workflow)
@@ -146,11 +125,26 @@ class TestPubkeyMixin(unittest.TestCase):
receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
receive_workflow.token = workflow.token
receive_workflow.restore()
- receive_workflow.run_thru('receive_key')
+ receive_workflow.run_thru('pubkey_confirmation')
with patch.object(receive_workflow, '_step_do_subscription') as step:
next(receive_workflow)
step.assert_called_once_with()
+
+class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def test_key_request_pubkey_set(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=True)
+ workflow.run_thru('pubkey_confirmation')
+ with patch.object(workflow, '_step_do_subscription') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
def test_send_key_confirm_request(self):
workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
pre_verified=True,
@@ -187,6 +181,39 @@ class TestPubkeyMixin(unittest.TestCase):
next(receive_workflow)
step.assert_called_once_with()
+
+class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def test_pended_data_key_request(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True)
+ with suppress(StopIteration):
+ workflow.run_thru('send_key_request')
+ self.assertIsNotNone(workflow.token)
+ pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
+ self.assertEqual(pendable['list_id'], 'test.example.com')
+ self.assertEqual(pendable['email'], 'rsa-1024b@example.org')
+ self.assertEqual(pendable['display_name'], '')
+ self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
+ self.assertEqual(pendable['token_owner'], 'subscriber')
+
+ def test_pended_data_key_confirmation(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True,
+ pubkey=self.sender_key.pubkey)
+ with suppress(StopIteration):
+ workflow.run_thru('send_key_confirm_request')
+ self.assertIsNotNone(workflow.token)
+ pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
+ self.assertEqual(pendable['list_id'], 'test.example.com')
+ self.assertEqual(pendable['email'], 'rsa-1024b@example.org')
+ self.assertEqual(pendable['display_name'], '')
+ self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
+ self.assertEqual(pendable['token_owner'], 'subscriber')
+
def test_exisitng_pgp_address(self):
workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
pre_verified=True,
@@ -198,7 +225,7 @@ class TestPubkeyMixin(unittest.TestCase):
pgp_address.key_confirmed = True
t.add(pgp_address)
- workflow.run_thru('pubkey_checks')
+ workflow.run_thru('pubkey_confirmation')
with patch.object(workflow, '_step_do_subscription') as step:
next(workflow)
step.assert_called_once_with()
@@ -213,7 +240,7 @@ class TestPubkeyMixin(unittest.TestCase):
pgp_address.key = self.sender_key.pubkey
t.add(pgp_address)
- workflow.run_thru('pubkey_checks')
+ workflow.run_thru('pubkey_confirmation')
with patch.object(workflow, '_step_send_key_confirm_request') as step:
next(workflow)
step.assert_called_once_with()
--
cgit v1.2.3-70-g09d2
From af5b7950923022c0476cbc576cd8536d18c39ef6 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Thu, 13 Jul 2017 01:50:57 +0200
Subject: Add key change command and some tests.
---
src/mailman_pgp/commands/eml_key.py | 26 ++++++-
src/mailman_pgp/commands/tests/test_key.py | 43 +++++++++++
src/mailman_pgp/workflows/base.py | 1 -
src/mailman_pgp/workflows/key_change.py | 117 +++++++++++++++++++++++++++++
4 files changed, 185 insertions(+), 2 deletions(-)
create mode 100644 src/mailman_pgp/workflows/key_change.py
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index a2fb4d5..9df6065 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -30,6 +30,7 @@ from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.workflows.base import CONFIRM_REQUEST
+from mailman_pgp.workflows.key_change import KeyChangeWorkflow
def _get_email(msg):
@@ -130,7 +131,30 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results):
# New public key in attachment, requires to be signed with current
# key
- pass
+ wrapped = PGPWrapper(msg)
+ if not wrapped.has_keys():
+ print('No keys attached? Send a key.', file=results)
+ return ContinueProcessing.no
+
+ keys = list(wrapped.keys())
+ if len(keys) != 1:
+ print('More than one key! Send only one key.', file=results)
+ return ContinueProcessing.no
+
+ email = _get_email(msg)
+ if not email:
+ print('No email to change key of.', file=results)
+ return ContinueProcessing.no
+
+ pgp_address = PGPAddress.for_email(email)
+ if pgp_address is None:
+ print('A pgp enabled address not found.', file=results)
+ return ContinueProcessing.no
+
+ workflow = KeyChangeWorkflow(mlist, pgp_address, keys.pop())
+ list(workflow)
+ print('Key change request received.', file=results)
+ return ContinueProcessing.no
def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results):
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
index 48ca3c9..fe75a6a 100644
--- a/src/mailman_pgp/commands/tests/test_key.py
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -365,3 +365,46 @@ class TestPreSubscription(unittest.TestCase):
self.assertIn('Message not signed, ignoring.',
results_msg.get_payload())
+
+
+class TestAfterSubscription(unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ self.mlist = create_list('test@example.com', style_name='pgp-default')
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = load_key('ecc_p256.priv.asc')
+
+ def test_key_change(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart_key = load_key('rsa_1024.priv.asc')
+ bart_new_key = load_key('ecc_p256.priv.asc')
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key change')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_key(bart_new_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=2)
+ if items[0].msg['Subject'] == 'The results of your email commands':
+ results = items[0].msg
+ confirm_request = items[1].msg
+ else:
+ results = items[1].msg
+ confirm_request = items[0].msg
+
+ self.assertIn('Key change request received.', results.get_payload())
+
+ confirm_wrapped = PGPWrapper(confirm_request)
+ self.assertTrue(confirm_wrapped.is_encrypted())
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index 20db5f9..a8679bd 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -18,7 +18,6 @@
""""""
from mailman.email.message import UserNotification
from mailman.interfaces.subscriptions import TokenOwner
-from mailman.workflows.common import WhichSubscriber
from pgpy import PGPKey
from mailman_pgp.model.address import PGPAddress
diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py
new file mode 100644
index 0000000..0098b19
--- /dev/null
+++ b/src/mailman_pgp/workflows/key_change.py
@@ -0,0 +1,117 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see .
+
+""""""
+from mailman.email.message import UserNotification
+from mailman.interfaces.pending import IPendable, IPendings
+from mailman.interfaces.workflows import IWorkflow
+from mailman.workflows.base import Workflow
+from pgpy import PGPKey
+from public import public
+from zope.component import getUtility
+from zope.interface import implementer
+
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.pgp.utils import copy_headers
+from mailman_pgp.pgp.wrapper import PGPWrapper
+
+CHANGE_CONFIRM_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+You requested to change your key.
+Reply to this message with this whole text
+signed with your supplied key, either inline or PGP/MIME.
+
+Fingerprint: {}
+Token: {}
+----------
+"""
+
+
+@public
+@implementer(IWorkflow)
+class KeyChangeWorkflow(Workflow):
+ name = 'pgp-key-change-workflow'
+ description = ''
+ initial_state = 'send_key_confirm_request'
+ save_attributes = (
+ 'address_key',
+ 'pubkey_key'
+ )
+
+ def __init__(self, mlist, pgp_address=None, pubkey=None):
+ super().__init__()
+ self.mlist = mlist
+ self.pgp_address = pgp_address
+ self.pubkey = pubkey
+
+ @property
+ def address_key(self):
+ return self.pgp_address.email
+
+ @address_key.setter
+ def address_key(self, value):
+ self.pgp_address = PGPAddress.for_email(value)
+
+ @property
+ def pubkey_key(self):
+ if self.pubkey is None:
+ return None
+ return str(self.pubkey)
+
+ @pubkey_key.setter
+ def pubkey_key(self, value):
+ if value is not None:
+ self.pubkey, _ = PGPKey.from_blob(value)
+ else:
+ self.pubkey = None
+
+ def _step_send_key_confirm_request(self):
+ pendings = getUtility(IPendings)
+ pendable = KeyChangeWorkflow.pendable_class()(
+ email=self.pgp_address.email,
+ pubkey=str(self.pubkey)
+ )
+ self.token = pendings.add(pendable)
+
+ self.push('receive_key_confirmation')
+ self.save()
+ request_address = self.mlist.request_address
+ email_address = self.pgp_address.email
+ msg = UserNotification(email_address, request_address,
+ 'key confirm {}'.format(self.token),
+ CHANGE_CONFIRM_REQUEST.format(
+ self.pubkey.fingerprint,
+ self.token))
+ wrapped = PGPWrapper(msg)
+ encrypted = wrapped.encrypt(self.pubkey)
+
+ msg.set_payload(encrypted.get_payload())
+ copy_headers(encrypted, msg, True)
+ msg.send(self.mlist)
+ raise StopIteration
+
+ def _step_receive_confirmation(self):
+ pass
+
+ @classmethod
+ def pendable_class(cls):
+ @implementer(IPendable)
+ class Pendable(dict):
+ PEND_TYPE = KeyChangeWorkflow.name
+
+ return Pendable
--
cgit v1.2.3-70-g09d2
From 03314a50895615623f5f0c80d77076657348ad05 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Thu, 13 Jul 2017 23:51:42 +0200
Subject: Fix TypeError in MIMEWrapper on empty protocol param.
---
src/mailman_pgp/pgp/mime.py | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py
index 975b8bd..a7e31b8 100644
--- a/src/mailman_pgp/pgp/mime.py
+++ b/src/mailman_pgp/pgp/mime.py
@@ -76,7 +76,8 @@ class MIMEWrapper:
if not self._is_mime():
return False
second_type = self.msg.get_payload(1).get_content_type()
- protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol'))
+ protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol',
+ ''))
content_subtype = self.msg.get_content_subtype()
return (second_type == MIMEWrapper._signed_type and
@@ -102,7 +103,8 @@ class MIMEWrapper:
first_type = self.msg.get_payload(0).get_content_type()
second_type = self.msg.get_payload(1).get_content_type()
content_subtype = self.msg.get_content_subtype()
- protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol'))
+ protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol',
+ ''))
return ('Version: 1' in first_part and
first_type == MIMEWrapper._encrypted_type and
--
cgit v1.2.3-70-g09d2
From f921aeeb2156f81ac2dd1a3db9dd4380b0dcb6da Mon Sep 17 00:00:00 2001
From: J08nY
Date: Thu, 13 Jul 2017 23:52:07 +0200
Subject: Fix setting of key_confirmed in PGPAddress reconstructor.
---
src/mailman_pgp/model/address.py | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py
index f102bf7..25a87dc 100644
--- a/src/mailman_pgp/model/address.py
+++ b/src/mailman_pgp/model/address.py
@@ -36,9 +36,9 @@ class PGPAddress(Base):
__tablename__ = 'pgp_addresses'
id = Column(Integer, primary_key=True)
- email = Column(SAUnicode, index=True)
+ email = Column(SAUnicode, index=True, unique=True)
key_fingerprint = Column(SAUnicode)
- key_confirmed = Column(Boolean)
+ key_confirmed = Column(Boolean, default=False)
def __init__(self, address):
super().__init__()
@@ -50,7 +50,6 @@ class PGPAddress(Base):
def _init(self):
self._address = None
self._key = None
- self.key_confirmed = False
@property
def key(self):
@@ -120,7 +119,7 @@ class PGPAddress(Base):
"""
if address is None:
return None
- return PGPAddress.query().filter_by(email=address.email).first()
+ return PGPAddress.for_email(address.email)
@staticmethod
def for_email(email):
--
cgit v1.2.3-70-g09d2
From 08389caf276e1b866cae2f6afc1d47b9c1876af5 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Thu, 13 Jul 2017 23:52:32 +0200
Subject: Rollback db when resetting for tests.
---
src/mailman_pgp/testing/layers.py | 1 +
1 file changed, 1 insertion(+)
diff --git a/src/mailman_pgp/testing/layers.py b/src/mailman_pgp/testing/layers.py
index 5e0d8b3..aabf216 100644
--- a/src/mailman_pgp/testing/layers.py
+++ b/src/mailman_pgp/testing/layers.py
@@ -30,6 +30,7 @@ def reset_pgp_world():
full_path = os.path.join(keydir, path)
if isfile(full_path):
os.remove(full_path)
+ config.db.session.rollback()
with transaction():
Base.metadata.drop_all(config.db.engine)
Base.metadata.create_all(config.db.engine)
--
cgit v1.2.3-70-g09d2
From 57f8d97c696913beeba8467aa550804422336d9c Mon Sep 17 00:00:00 2001
From: J08nY
Date: Thu, 13 Jul 2017 23:57:18 +0200
Subject: Split PGPMixin from subscription mixins. And bunch of other things.
---
src/mailman_pgp/commands/eml_key.py | 46 ++++++++++------
src/mailman_pgp/commands/tests/test_key.py | 49 +++++++++++------
src/mailman_pgp/workflows/base.py | 48 ++++++++++-------
src/mailman_pgp/workflows/key_change.py | 12 +++--
src/mailman_pgp/workflows/subscription.py | 21 ++++++--
src/mailman_pgp/workflows/tests/test_base.py | 81 +++++++++++++---------------
6 files changed, 153 insertions(+), 104 deletions(-)
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index 9df6065..7b7782d 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -19,6 +19,7 @@
from email.utils import parseaddr
from mailman.interfaces.command import ContinueProcessing, IEmailCommand
+from mailman.interfaces.pending import IPendings
from mailman.interfaces.subscriptions import ISubscriptionManager
from mailman.interfaces.usermanager import IUserManager
from public import public
@@ -47,6 +48,10 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results):
return ContinueProcessing.no
wrapped = PGPWrapper(msg)
+ if wrapped.is_encrypted():
+ decrypted = wrapped.decrypt(pgp_list.key)
+ wrapped = PGPWrapper(decrypted)
+
if not wrapped.has_keys():
print('No keys attached? Send a key.', file=results)
return ContinueProcessing.no
@@ -66,21 +71,24 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results):
print('No adddress to subscribe with.', file=results)
return ContinueProcessing.no
- with transaction() as t:
- pgp_address = PGPAddress.for_address(address)
- if pgp_address is None:
- pgp_address = PGPAddress(address)
- pgp_address.key = keys.pop()
- t.add(pgp_address)
+ pgp_address = PGPAddress.for_address(address)
+ if pgp_address is None:
+ print('A pgp enabled address not found.', file=results)
+ return ContinueProcessing.no
token = arguments[1]
- try:
- ISubscriptionManager(mlist).confirm(token)
- print('Key succesfully set.', file=results)
- print('Key fingerprint: {}'.format(pgp_address.key.fingerprint),
- file=results)
- except LookupError:
+ pendable = getUtility(IPendings).confirm(token, expunge=False)
+ if pendable is None:
print('Wrong token.', file=results)
+ return ContinueProcessing.no
+
+ with transaction():
+ pgp_address.key = keys.pop()
+ ISubscriptionManager(mlist).confirm(token)
+
+ print('Key succesfully set.', file=results)
+ print('Key fingerprint: {}'.format(pgp_address.key.fingerprint),
+ file=results)
return ContinueProcessing.no
@@ -115,13 +123,17 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
token = arguments[1]
- expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint,
- token)
+ pendable = getUtility(IPendings).confirm(token, expunge=False)
+ if pendable is None:
+ print('Wrong token.', file=results)
+ return ContinueProcessing.no
+
+ # TODO differentiate between key change and subscription here.
+
+ expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint, token)
for sig_subject in wrapped.get_signed():
if expecting in sig_subject:
- with transaction():
- pgp_address.key_confirmed = True
- ISubscriptionManager(mlist).confirm(token)
+ ISubscriptionManager(mlist).confirm(token)
break
else:
print("Message doesn't contain the expected statement.", file=results)
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
index fe75a6a..64f8ae6 100644
--- a/src/mailman_pgp/commands/tests/test_key.py
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -134,7 +134,6 @@ class TestPreSubscription(unittest.TestCase):
pgp_address = PGPAddress.for_address(bart)
self.assertIsNotNone(pgp_address)
self.assertEqual(pgp_address.key.fingerprint, bart_key.fingerprint)
- self.assertEqual(pgp_address.key_fingerprint, bart_key.fingerprint)
self.assertFalse(pgp_address.key_confirmed)
items = get_queue_messages('virgin', expected_count=2)
@@ -242,11 +241,6 @@ class TestPreSubscription(unittest.TestCase):
get_queue_messages('virgin')
- with transaction() as t:
- pgp_address = PGPAddress(bart)
- pgp_address.key = bart_key.pubkey
- t.add(pgp_address)
-
message = _create_plain('bart@example.com', 'test@example.com',
'Re: key confirm {}'.format(token),
CONFIRM_REQUEST.format(bart_key.fingerprint,
@@ -275,11 +269,6 @@ class TestPreSubscription(unittest.TestCase):
get_queue_messages('virgin')
- with transaction() as t:
- pgp_address = PGPAddress(bart)
- pgp_address.key = bart_key.pubkey
- t.add(pgp_address)
-
message = _create_plain('bart@example.com', 'test@example.com',
'Re: key confirm {}'.format(token),
CONFIRM_REQUEST.format(bart_key.fingerprint,
@@ -290,6 +279,7 @@ class TestPreSubscription(unittest.TestCase):
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
+
make_testable_runner(CommandRunner, 'command').run()
pgp_address = PGPAddress.for_address(bart)
@@ -346,11 +336,6 @@ class TestPreSubscription(unittest.TestCase):
get_queue_messages('virgin')
- with transaction() as t:
- pgp_address = PGPAddress(bart)
- pgp_address.key = bart_key.pubkey
- t.add(pgp_address)
-
message = _create_plain('bart@example.com', 'test@example.com',
'Re: key confirm {}'.format(token),
CONFIRM_REQUEST.format(bart_key.fingerprint,
@@ -408,3 +393,35 @@ class TestAfterSubscription(unittest.TestCase):
confirm_wrapped = PGPWrapper(confirm_request)
self.assertTrue(confirm_wrapped.is_encrypted())
+ decrypted = confirm_wrapped.decrypt(bart_new_key)
+ self.assertIn('key confirm', decrypted['subject'])
+
+ def test_key_change_confirm(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart_key = load_key('rsa_1024.priv.asc')
+ bart_new_key = load_key('ecc_p256.priv.asc')
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key change')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_key(bart_new_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=2)
+ if items[0].msg['Subject'] == 'The results of your email commands':
+ confirm_request = items[1].msg
+ else:
+ confirm_request = items[0].msg
+ request_wrapped = PGPWrapper(confirm_request)
+ request_wrapped.decrypt(bart_new_key)
+ # TODO finish this
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index a8679bd..d05781d 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -20,6 +20,7 @@ from mailman.email.message import UserNotification
from mailman.interfaces.subscriptions import TokenOwner
from pgpy import PGPKey
+from mailman_pgp.database import transaction
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.utils import copy_headers
@@ -44,6 +45,15 @@ Token: {}
"""
+class PGPMixin:
+ def _step_pgp_prepare(self):
+ pgp_address = PGPAddress.for_address(self.address)
+ if pgp_address is None:
+ with transaction() as t:
+ pgp_address = PGPAddress(self.address)
+ t.add(pgp_address)
+
+
class SetPubkeyMixin:
def __init__(self, pubkey=None, pre_confirmed=False):
self.pubkey = pubkey
@@ -64,13 +74,14 @@ class SetPubkeyMixin:
def _step_pubkey_checks(self):
pgp_address = PGPAddress.for_address(self.address)
+ assert pgp_address is not None
- if pgp_address is not None:
- if not pgp_address.key:
+ if self.pubkey is None:
+ if pgp_address.key is None:
self.push('send_key_request')
else:
- if not self.pubkey:
- self.push('send_key_request')
+ with transaction():
+ pgp_address.key = self.pubkey
def _step_send_key_request(self):
self._set_token(TokenOwner.subscriber)
@@ -89,14 +100,6 @@ class SetPubkeyMixin:
self._restore_subscriber()
self._set_token(TokenOwner.no_one)
- pgp_address = PGPAddress.for_address(self.address)
- if pgp_address is None or pgp_address.key is None:
- # The workflow was confirmed but we still dont have an address
- # or the pubkey. So resend request and wait.
- self.push('send_key_request')
- else:
- self.pubkey = pgp_address.key
-
class ConfirmPubkeyMixin:
def __init__(self, pre_confirmed=False):
@@ -104,27 +107,31 @@ class ConfirmPubkeyMixin:
def _step_pubkey_confirmation(self):
pgp_address = PGPAddress.for_address(self.address)
+ assert pgp_address is not None
- if pgp_address is not None:
- if not pgp_address.key_confirmed and not self.pubkey_confirmed:
- self.push('send_key_confirm_request')
+ if self.pubkey_confirmed:
+ with transaction():
+ pgp_address.key_confirmed = True
else:
- if not self.pubkey_confirmed:
+ if not pgp_address.key_confirmed:
self.push('send_key_confirm_request')
def _step_send_key_confirm_request(self):
self._set_token(TokenOwner.subscriber)
self.push('receive_key_confirmation')
self.save()
+
+ pgp_address = PGPAddress.for_address(self.address)
request_address = self.mlist.request_address
email_address = self.address.email
msg = UserNotification(email_address, request_address,
'key confirm {}'.format(self.token),
- CONFIRM_REQUEST.format(self.pubkey.fingerprint,
- self.token))
+ CONFIRM_REQUEST.format(
+ pgp_address.key_fingerprint,
+ self.token))
pgp_list = PGPMailingList.for_list(self.mlist)
wrapped = PGPWrapper(msg)
- encrypted = wrapped.sign_encrypt(pgp_list.key, self.pubkey,
+ encrypted = wrapped.sign_encrypt(pgp_list.key, pgp_address.key,
pgp_list.pubkey)
msg.set_payload(encrypted.get_payload())
@@ -135,3 +142,6 @@ class ConfirmPubkeyMixin:
def _step_receive_key_confirmation(self):
self._restore_subscriber()
self._set_token(TokenOwner.no_one)
+ with transaction():
+ pgp_address = PGPAddress.for_address(self.address)
+ pgp_address.key_confirmed = True
diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py
index 0098b19..8831d28 100644
--- a/src/mailman_pgp/workflows/key_change.py
+++ b/src/mailman_pgp/workflows/key_change.py
@@ -95,8 +95,8 @@ class KeyChangeWorkflow(Workflow):
msg = UserNotification(email_address, request_address,
'key confirm {}'.format(self.token),
CHANGE_CONFIRM_REQUEST.format(
- self.pubkey.fingerprint,
- self.token))
+ self.pubkey.fingerprint,
+ self.token))
wrapped = PGPWrapper(msg)
encrypted = wrapped.encrypt(self.pubkey)
@@ -106,7 +106,13 @@ class KeyChangeWorkflow(Workflow):
raise StopIteration
def _step_receive_confirmation(self):
- pass
+ self.pgp_address.key = self.pubkey
+ self.pgp_address.key_confirmed = True
+
+ pendings = getUtility(IPendings)
+ if self.token is not None:
+ pendings.confirm(self.token)
+ self.token = None
@classmethod
def pendable_class(cls):
diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py
index c4138e6..ce02013 100644
--- a/src/mailman_pgp/workflows/subscription.py
+++ b/src/mailman_pgp/workflows/subscription.py
@@ -24,13 +24,15 @@ from mailman.workflows.common import (ConfirmationMixin, ModerationMixin,
from public import public
from zope.interface import implementer
-from mailman_pgp.workflows.base import ConfirmPubkeyMixin, SetPubkeyMixin
+from mailman_pgp.workflows.base import (ConfirmPubkeyMixin, PGPMixin,
+ SetPubkeyMixin)
@public
@implementer(ISubscriptionWorkflow)
class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin,
- SetPubkeyMixin, ConfirmPubkeyMixin):
+ SetPubkeyMixin, ConfirmPubkeyMixin,
+ PGPMixin):
""""""
name = 'pgp-policy-open'
@@ -54,11 +56,13 @@ class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin,
VerificationMixin.__init__(self, pre_verified=pre_verified)
SetPubkeyMixin.__init__(self, pubkey=pubkey)
ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
+ PGPMixin.__init__(self)
def _step_prepare(self):
self.push('do_subscription')
self.push('pubkey_confirmation')
self.push('pubkey_checks')
+ self.push('pgp_prepare')
self.push('verification_checks')
self.push('sanity_checks')
@@ -67,7 +71,7 @@ class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin,
@implementer(ISubscriptionWorkflow)
class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
ConfirmationMixin, SetPubkeyMixin,
- ConfirmPubkeyMixin):
+ ConfirmPubkeyMixin, PGPMixin):
""""""
name = 'pgp-policy-confirm'
@@ -93,11 +97,13 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed)
SetPubkeyMixin.__init__(self, pubkey=pubkey)
ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
+ PGPMixin.__init__(self)
def _step_prepare(self):
self.push('do_subscription')
self.push('pubkey_confirmation')
self.push('pubkey_checks')
+ self.push('pgp_prepare')
self.push('confirmation_checks')
self.push('verification_checks')
self.push('sanity_checks')
@@ -107,7 +113,7 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
@implementer(ISubscriptionWorkflow)
class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
ModerationMixin, SetPubkeyMixin,
- ConfirmPubkeyMixin):
+ ConfirmPubkeyMixin, PGPMixin):
""""""
name = 'pgp-policy-moderate'
@@ -133,12 +139,14 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
ModerationMixin.__init__(self, pre_approved=pre_approved)
SetPubkeyMixin.__init__(self, pubkey=pubkey)
ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
+ PGPMixin.__init__(self)
def _step_prepare(self):
self.push('do_subscription')
self.push('moderation_checks')
self.push('pubkey_confirmation')
self.push('pubkey_checks')
+ self.push('pgp_prepare')
self.push('verification_checks')
self.push('sanity_checks')
@@ -147,7 +155,8 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
@implementer(ISubscriptionWorkflow)
class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
ConfirmationMixin, ModerationMixin,
- SetPubkeyMixin, ConfirmPubkeyMixin):
+ SetPubkeyMixin, ConfirmPubkeyMixin,
+ PGPMixin):
""""""
name = 'pgp-policy-confirm-moderate'
@@ -175,12 +184,14 @@ class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
ModerationMixin.__init__(self, pre_approved=pre_approved)
SetPubkeyMixin.__init__(self, pubkey=pubkey)
ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
+ PGPMixin.__init__(self)
def _step_prepare(self):
self.push('do_subscription')
self.push('moderation_checks')
self.push('pubkey_confirmation')
self.push('pubkey_checks')
+ self.push('pgp_prepare')
self.push('confirmation_checks')
self.push('verification_checks')
self.push('sanity_checks')
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
index 913ab67..af00d01 100644
--- a/src/mailman_pgp/workflows/tests/test_base.py
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -38,7 +38,6 @@ from mailman_pgp.workflows.subscription import ConfirmSubscriptionPolicy
class PubkeyMixinSetup():
-
def setUp(self):
with mm_transaction():
self.mlist = create_list('test@example.com',
@@ -55,6 +54,27 @@ class PubkeyMixinSetup():
self.sender = self.um.create_address('rsa-1024b@example.org')
+class TestPGPMixin(PubkeyMixinSetup, unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def test_create_address(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True)
+ workflow.run_thru('pgp_prepare')
+ pgp_address = PGPAddress.for_address(self.sender)
+ self.assertIsNotNone(pgp_address)
+
+ def test_address_existing(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True)
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ t.add(pgp_address)
+ workflow.run_thru('pgp_prepare')
+
+
class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
layer = PGPConfigLayer
@@ -75,60 +95,38 @@ class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
pre_verified=True,
pre_confirmed=True)
list(workflow)
- with transaction() as t:
- pgp_address = PGPAddress(self.sender)
+ with transaction():
+ pgp_address = PGPAddress.for_address(self.sender)
pgp_address.key = self.sender_key.pubkey
- t.add(pgp_address)
receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
receive_workflow.token = workflow.token
receive_workflow.restore()
receive_workflow.run_thru('receive_key')
- self.assertIsNotNone(receive_workflow.pubkey)
- self.assertEqual(receive_workflow.pubkey.fingerprint,
- self.sender_key.pubkey.fingerprint)
-
- receive_workflow.run_thru('pubkey_confirmation')
- with patch.object(receive_workflow,
- '_step_send_key_confirm_request') as step:
- next(receive_workflow)
- step.assert_called_once_with()
-
- def test_receive_key_no_address(self):
+ def test_set_pubkey(self):
workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
pre_verified=True,
- pre_confirmed=True)
- list(workflow)
- receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
- receive_workflow.token = workflow.token
- receive_workflow.restore()
- receive_workflow.run_thru('receive_key')
-
- self.assertIsNone(receive_workflow.pubkey)
- with patch.object(receive_workflow,
- '_step_send_key_request') as step:
- next(receive_workflow)
- step.assert_called_once_with()
+ pre_confirmed=True,
+ pubkey=self.sender_key.pubkey)
+ workflow.run_thru('pubkey_checks')
+ pgp_address = PGPAddress.for_address(self.sender)
+ self.assertIsNotNone(pgp_address)
+ self.assertIsNotNone(pgp_address.key)
+ self.assertEqual(pgp_address.key_fingerprint,
+ self.sender_key.fingerprint)
- def test_receive_key_pubkey_confirmed(self):
+ def test_pubkey_set(self):
workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
pre_verified=True,
- pre_confirmed=True,
- pubkey_pre_confirmed=True)
- list(workflow)
+ pre_confirmed=True)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
pgp_address.key = self.sender_key.pubkey
t.add(pgp_address)
-
- receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
- receive_workflow.token = workflow.token
- receive_workflow.restore()
- receive_workflow.run_thru('pubkey_confirmation')
- with patch.object(receive_workflow, '_step_do_subscription') as step:
- next(receive_workflow)
- step.assert_called_once_with()
+ workflow.run_thru('pubkey_checks')
+ self.assertEqual(pgp_address.key_fingerprint,
+ self.sender_key.fingerprint)
class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
@@ -168,11 +166,6 @@ class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
pubkey_pre_confirmed=False)
list(workflow)
- with transaction() as t:
- pgp_address = PGPAddress(self.sender)
- pgp_address.key = self.sender_key.pubkey
- t.add(pgp_address)
-
receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
receive_workflow.token = workflow.token
receive_workflow.restore()
--
cgit v1.2.3-70-g09d2
From 8368cd832d21b404c01ab475ade6209b906ab422 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Fri, 14 Jul 2017 00:37:40 +0200
Subject: Make key change work.
---
src/mailman_pgp/commands/eml_key.py | 15 ++++++++++++---
src/mailman_pgp/commands/tests/test_key.py | 23 +++++++++++++++++++++--
src/mailman_pgp/workflows/key_change.py | 11 +++++++----
3 files changed, 40 insertions(+), 9 deletions(-)
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index 7b7782d..2f7a7e7 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -31,7 +31,8 @@ from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.workflows.base import CONFIRM_REQUEST
-from mailman_pgp.workflows.key_change import KeyChangeWorkflow
+from mailman_pgp.workflows.key_change import (CHANGE_CONFIRM_REQUEST,
+ KeyChangeWorkflow)
def _get_email(msg):
@@ -128,9 +129,12 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
print('Wrong token.', file=results)
return ContinueProcessing.no
- # TODO differentiate between key change and subscription here.
+ if pendable.get('type') == KeyChangeWorkflow.pendable_class().PEND_TYPE:
+ expecting = CHANGE_CONFIRM_REQUEST.format(pendable.get('fingerprint'),
+ token)
+ else:
+ expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint, token)
- expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint, token)
for sig_subject in wrapped.get_signed():
if expecting in sig_subject:
ISubscriptionManager(mlist).confirm(token)
@@ -143,6 +147,11 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results):
# New public key in attachment, requires to be signed with current
# key
+ if len(arguments) != 1:
+ print('Extraneous argument/s: ' + ','.join(arguments[1:]),
+ file=results)
+ return ContinueProcessing.no
+
wrapped = PGPWrapper(msg)
if not wrapped.has_keys():
print('No keys attached? Send a key.', file=results)
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
index 64f8ae6..4f62b11 100644
--- a/src/mailman_pgp/commands/tests/test_key.py
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -37,6 +37,7 @@ from mailman_pgp.pgp.tests.base import load_key
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.testing.layers import PGPConfigLayer
from mailman_pgp.workflows.base import CONFIRM_REQUEST
+from mailman_pgp.workflows.key_change import CHANGE_CONFIRM_REQUEST
from mailman_pgp.workflows.subscription import OpenSubscriptionPolicy
@@ -423,5 +424,23 @@ class TestAfterSubscription(unittest.TestCase):
else:
confirm_request = items[0].msg
request_wrapped = PGPWrapper(confirm_request)
- request_wrapped.decrypt(bart_new_key)
- # TODO finish this
+ decrypted = request_wrapped.decrypt(bart_new_key)
+
+ subj = decrypted['subject']
+ token = subj.split(' ')[-1]
+
+ confirm_message = _create_plain('bart@example.com', 'test@example.com',
+ decrypted['subject'],
+ CHANGE_CONFIRM_REQUEST.format(
+ bart_new_key.fingerprint,
+ token))
+ wrapped_confirm = MIMEWrapper(confirm_message)
+ confirm = wrapped_confirm.sign(bart_key)
+
+ mm_config.switchboards['command'].enqueue(confirm,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ pgp_address = PGPAddress.for_address(bart)
+ self.assertEqual(pgp_address.key_fingerprint, bart_new_key.fingerprint)
+ self.assertTrue(pgp_address.key_confirmed)
diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py
index 8831d28..a67edbb 100644
--- a/src/mailman_pgp/workflows/key_change.py
+++ b/src/mailman_pgp/workflows/key_change.py
@@ -25,6 +25,7 @@ from public import public
from zope.component import getUtility
from zope.interface import implementer
+from mailman_pgp.database import transaction
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.pgp.utils import copy_headers
from mailman_pgp.pgp.wrapper import PGPWrapper
@@ -84,11 +85,12 @@ class KeyChangeWorkflow(Workflow):
pendings = getUtility(IPendings)
pendable = KeyChangeWorkflow.pendable_class()(
email=self.pgp_address.email,
- pubkey=str(self.pubkey)
+ pubkey=str(self.pubkey),
+ fingerprint=self.pubkey.fingerprint
)
self.token = pendings.add(pendable)
- self.push('receive_key_confirmation')
+ self.push('receive_confirmation')
self.save()
request_address = self.mlist.request_address
email_address = self.pgp_address.email
@@ -106,8 +108,9 @@ class KeyChangeWorkflow(Workflow):
raise StopIteration
def _step_receive_confirmation(self):
- self.pgp_address.key = self.pubkey
- self.pgp_address.key_confirmed = True
+ with transaction():
+ self.pgp_address.key = self.pubkey
+ self.pgp_address.key_confirmed = True
pendings = getUtility(IPendings)
if self.token is not None:
--
cgit v1.2.3-70-g09d2
From d8afe4bec9282254483ea1c7571298dcd9731508 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Fri, 14 Jul 2017 01:44:54 +0200
Subject: Workflow tests refactor.
---
src/mailman_pgp/commands/tests/test_key.py | 65 +++++------
src/mailman_pgp/workflows/base.py | 3 +-
src/mailman_pgp/workflows/tests/test_base.py | 130 +++++++++++----------
.../workflows/tests/test_subscription.py | 2 +
4 files changed, 105 insertions(+), 95 deletions(-)
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
index 4f62b11..44d5b25 100644
--- a/src/mailman_pgp/commands/tests/test_key.py
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -26,6 +26,7 @@ from mailman.interfaces.usermanager import IUserManager
from mailman.runners.command import CommandRunner
from mailman.testing.helpers import get_queue_messages, make_testable_runner
from mailman.utilities.datetime import now
+from public import public
from zope.component import getUtility
from mailman_pgp.config import mm_config
@@ -59,6 +60,7 @@ def _create_mixed(from_hdr, to_hdr, subject_hdr):
return message
+@public
class TestPreDispatch(unittest.TestCase):
layer = PGPConfigLayer
@@ -103,6 +105,7 @@ class TestPreDispatch(unittest.TestCase):
results_msg.get_payload())
+@public
class TestPreSubscription(unittest.TestCase):
layer = PGPConfigLayer
@@ -111,6 +114,9 @@ class TestPreSubscription(unittest.TestCase):
self.pgp_list = PGPMailingList.for_list(self.mlist)
self.pgp_list.key = load_key('ecc_p256.priv.asc')
+ self.bart_key = load_key('rsa_1024.priv.asc')
+ self.anne_key = load_key('ecc_p256.priv.asc')
+
def test_set(self):
self.mlist.subscription_policy = OpenSubscriptionPolicy
bart = getUtility(IUserManager).create_address('bart@example.com',
@@ -121,12 +127,10 @@ class TestPreSubscription(unittest.TestCase):
get_queue_messages('virgin')
- bart_key = load_key('rsa_1024.priv.asc')
-
set_message = _create_mixed('bart@example.com', 'test@example.com',
'Re: key set {}'.format(token))
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(bart_key.pubkey)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
mm_config.switchboards['command'].enqueue(set_message,
listid='test.example.com')
@@ -134,7 +138,8 @@ class TestPreSubscription(unittest.TestCase):
pgp_address = PGPAddress.for_address(bart)
self.assertIsNotNone(pgp_address)
- self.assertEqual(pgp_address.key.fingerprint, bart_key.fingerprint)
+ self.assertEqual(pgp_address.key.fingerprint,
+ self.bart_key.fingerprint)
self.assertFalse(pgp_address.key_confirmed)
items = get_queue_messages('virgin', expected_count=2)
@@ -146,7 +151,7 @@ class TestPreSubscription(unittest.TestCase):
confirm_request = items[0].msg
self.assertIn('Key succesfully set.', results.get_payload())
- self.assertIn('Key fingerprint: {}'.format(bart_key.fingerprint),
+ self.assertIn('Key fingerprint: {}'.format(self.bart_key.fingerprint),
results.get_payload())
confirm_wrapped = PGPWrapper(confirm_request)
@@ -178,15 +183,12 @@ class TestPreSubscription(unittest.TestCase):
results_msg.get_payload())
def test_set_multiple_keys(self):
- bart_key = load_key('rsa_1024.priv.asc')
- anne_key = load_key('ecc_p256.priv.asc')
-
set_message = _create_mixed('bart@example.com', 'test@example.com',
'Re: key set token')
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(bart_key.pubkey)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(anne_key.pubkey)
+ set_message = wrapped_set_message.attach_key(self.anne_key.pubkey)
mm_config.switchboards['command'].enqueue(set_message,
listid='test.example.com')
@@ -198,11 +200,9 @@ class TestPreSubscription(unittest.TestCase):
results_msg.get_payload())
def test_set_no_email(self):
- bart_key = load_key('rsa_1024.priv.asc')
-
message = _create_mixed('', 'test@example.com', 'key set token')
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_key(bart_key.pubkey)
+ message = wrapped_message.attach_key(self.bart_key.pubkey)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -213,12 +213,10 @@ class TestPreSubscription(unittest.TestCase):
self.assertIn('No email to subscribe with.', results_msg.get_payload())
def test_set_no_address(self):
- bart_key = load_key('rsa_1024.priv.asc')
-
set_message = _create_mixed('bart@example.com', 'test@example.com',
'key set token')
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(bart_key.pubkey)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
mm_config.switchboards['command'].enqueue(set_message,
listid='test.example.com')
@@ -235,19 +233,18 @@ class TestPreSubscription(unittest.TestCase):
'Bart Person')
bart.verified_on = now()
- bart_key = load_key('rsa_1024.priv.asc')
-
token, token_owner, member = ISubscriptionManager(self.mlist).register(
- bart, pubkey=bart_key.pubkey)
+ bart, pubkey=self.bart_key.pubkey)
get_queue_messages('virgin')
message = _create_plain('bart@example.com', 'test@example.com',
'Re: key confirm {}'.format(token),
- CONFIRM_REQUEST.format(bart_key.fingerprint,
- token))
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ token))
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.sign(bart_key)
+ message = wrapped_message.sign(self.bart_key)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -263,20 +260,20 @@ class TestPreSubscription(unittest.TestCase):
'Bart Person')
bart.verified_on = now()
- bart_key = load_key('rsa_1024.priv.asc')
-
token, token_owner, member = ISubscriptionManager(self.mlist).register(
- bart, pubkey=bart_key.pubkey)
+ bart, pubkey=self.bart_key.pubkey)
get_queue_messages('virgin')
message = _create_plain('bart@example.com', 'test@example.com',
'Re: key confirm {}'.format(token),
- CONFIRM_REQUEST.format(bart_key.fingerprint,
- token))
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ token))
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.sign_encrypt(bart_key, self.pgp_list.pubkey,
- bart_key.pubkey)
+ message = wrapped_message.sign_encrypt(self.bart_key,
+ self.pgp_list.pubkey,
+ self.bart_key.pubkey)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -330,17 +327,16 @@ class TestPreSubscription(unittest.TestCase):
'Bart Person')
bart.verified_on = now()
- bart_key = load_key('rsa_1024.priv.asc')
-
token, token_owner, member = ISubscriptionManager(self.mlist).register(
- bart, pubkey=bart_key.pubkey)
+ bart, pubkey=self.bart_key.pubkey)
get_queue_messages('virgin')
message = _create_plain('bart@example.com', 'test@example.com',
'Re: key confirm {}'.format(token),
- CONFIRM_REQUEST.format(bart_key.fingerprint,
- token))
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ token))
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -353,6 +349,7 @@ class TestPreSubscription(unittest.TestCase):
results_msg.get_payload())
+@public
class TestAfterSubscription(unittest.TestCase):
layer = PGPConfigLayer
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index d05781d..014f2dd 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -55,9 +55,8 @@ class PGPMixin:
class SetPubkeyMixin:
- def __init__(self, pubkey=None, pre_confirmed=False):
+ def __init__(self, pubkey=None):
self.pubkey = pubkey
- self.pubkey_confirmed = pre_confirmed
@property
def pubkey_key(self):
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
index af00d01..c59cfe0 100644
--- a/src/mailman_pgp/workflows/tests/test_base.py
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -24,8 +24,12 @@ from unittest.mock import patch
from mailman.app.lifecycle import create_list
from mailman.interfaces.pending import IPendings
from mailman.interfaces.usermanager import IUserManager
+from mailman.interfaces.workflows import IWorkflow
from mailman.testing.helpers import get_queue_messages
+from mailman.workflows.common import SubscriptionBase
+from public import public
from zope.component import getUtility
+from zope.interface import implementer
from mailman_pgp.database import mm_transaction, transaction
from mailman_pgp.model.address import PGPAddress
@@ -33,11 +37,11 @@ from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.tests.base import load_key
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.testing.layers import PGPConfigLayer
-from mailman_pgp.workflows.base import KEY_REQUEST
-from mailman_pgp.workflows.subscription import ConfirmSubscriptionPolicy
+from mailman_pgp.workflows.base import (KEY_REQUEST, PGPMixin, SetPubkeyMixin,
+ ConfirmPubkeyMixin)
-class PubkeyMixinSetup():
+class PubkeyMixinTestSetup():
def setUp(self):
with mm_transaction():
self.mlist = create_list('test@example.com',
@@ -54,34 +58,62 @@ class PubkeyMixinSetup():
self.sender = self.um.create_address('rsa-1024b@example.org')
-class TestPGPMixin(PubkeyMixinSetup, unittest.TestCase):
+@implementer(IWorkflow)
+class PGPTestWorkflow(SubscriptionBase, PGPMixin, SetPubkeyMixin,
+ ConfirmPubkeyMixin):
+ name = 'test-workflow'
+ description = ''
+ initial_state = 'prepare'
+ save_attributes = (
+ 'pubkey_key',
+ 'pubkey_confirmed',
+ 'address_key',
+ 'subscriber_key',
+ 'user_key',
+ 'token_owner_key'
+ )
+
+ def __init__(self, mlist, subscriber=None, *, pubkey=None,
+ pubkey_pre_confirmed=False):
+ SubscriptionBase.__init__(self, mlist, subscriber)
+ SetPubkeyMixin.__init__(self, pubkey=pubkey)
+ ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
+ PGPMixin.__init__(self)
+
+ def _step_prepare(self):
+ self.push('do_subscription')
+ self.push('pubkey_confirmation')
+ self.push('pubkey_checks')
+ self.push('pgp_prepare')
+ self.push('sanity_checks')
+
+
+@public
+class TestPGPMixin(PubkeyMixinTestSetup, unittest.TestCase):
layer = PGPConfigLayer
def test_create_address(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
workflow.run_thru('pgp_prepare')
pgp_address = PGPAddress.for_address(self.sender)
self.assertIsNotNone(pgp_address)
def test_address_existing(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
t.add(pgp_address)
workflow.run_thru('pgp_prepare')
+ still = PGPAddress.for_address(self.sender)
+ self.assertIsNotNone(still)
-class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
+@public
+class TestSetPubkeyMixin(PubkeyMixinTestSetup, unittest.TestCase):
layer = PGPConfigLayer
def test_key_request_sent(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
list(workflow)
items = get_queue_messages('virgin', expected_count=1)
message = items[0].msg
@@ -91,24 +123,20 @@ class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
self.assertEqual(message.get_payload(), KEY_REQUEST)
def test_receive_key(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
list(workflow)
with transaction():
pgp_address = PGPAddress.for_address(self.sender)
pgp_address.key = self.sender_key.pubkey
- receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
+ receive_workflow = PGPTestWorkflow(self.mlist)
receive_workflow.token = workflow.token
receive_workflow.restore()
receive_workflow.run_thru('receive_key')
def test_set_pubkey(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True,
- pubkey=self.sender_key.pubkey)
+ workflow = PGPTestWorkflow(self.mlist, self.sender,
+ pubkey=self.sender_key.pubkey)
workflow.run_thru('pubkey_checks')
pgp_address = PGPAddress.for_address(self.sender)
self.assertIsNotNone(pgp_address)
@@ -117,9 +145,7 @@ class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
self.sender_key.fingerprint)
def test_pubkey_set(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
pgp_address.key = self.sender_key.pubkey
@@ -129,26 +155,23 @@ class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
self.sender_key.fingerprint)
-class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
+@public
+class TestConfirmPubkeyMixin(PubkeyMixinTestSetup, unittest.TestCase):
layer = PGPConfigLayer
def test_key_request_pubkey_set(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True,
- pubkey=self.sender_key.pubkey,
- pubkey_pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=True)
workflow.run_thru('pubkey_confirmation')
with patch.object(workflow, '_step_do_subscription') as step:
next(workflow)
step.assert_called_once_with()
def test_send_key_confirm_request(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True,
- pubkey=self.sender_key.pubkey,
- pubkey_pre_confirmed=False)
+ workflow = PGPTestWorkflow(self.mlist, self.sender,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=False)
list(workflow)
items = get_queue_messages('virgin', expected_count=1)
message = items[0].msg
@@ -159,14 +182,12 @@ class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
self.assertTrue(wrapped.is_encrypted())
def test_receive_confirmation(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True,
- pubkey=self.sender_key.pubkey,
- pubkey_pre_confirmed=False)
+ workflow = PGPTestWorkflow(self.mlist, self.sender,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=False)
list(workflow)
- receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
+ receive_workflow = PGPTestWorkflow(self.mlist)
receive_workflow.token = workflow.token
receive_workflow.restore()
receive_workflow.run_thru('receive_key_confirmation')
@@ -175,13 +196,12 @@ class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
step.assert_called_once_with()
-class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase):
+@public
+class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase):
layer = PGPConfigLayer
def test_pended_data_key_request(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
with suppress(StopIteration):
workflow.run_thru('send_key_request')
self.assertIsNotNone(workflow.token)
@@ -193,10 +213,8 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase):
self.assertEqual(pendable['token_owner'], 'subscriber')
def test_pended_data_key_confirmation(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True,
- pubkey=self.sender_key.pubkey)
+ workflow = PGPTestWorkflow(self.mlist, self.sender,
+ pubkey=self.sender_key.pubkey)
with suppress(StopIteration):
workflow.run_thru('send_key_confirm_request')
self.assertIsNotNone(workflow.token)
@@ -208,9 +226,7 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase):
self.assertEqual(pendable['token_owner'], 'subscriber')
def test_exisitng_pgp_address(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
@@ -224,9 +240,7 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase):
step.assert_called_once_with()
def test_exisitng_pgp_address_not_confirmed(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
@@ -239,9 +253,7 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase):
step.assert_called_once_with()
def test_exisitng_pgp_address_no_key(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
diff --git a/src/mailman_pgp/workflows/tests/test_subscription.py b/src/mailman_pgp/workflows/tests/test_subscription.py
index 9464a83..f9fa1e1 100644
--- a/src/mailman_pgp/workflows/tests/test_subscription.py
+++ b/src/mailman_pgp/workflows/tests/test_subscription.py
@@ -20,6 +20,7 @@ import unittest
from mailman.app.lifecycle import create_list
from mailman.interfaces.usermanager import IUserManager
+from public import public
from zope.component import getUtility
from mailman_pgp.database import mm_transaction
@@ -29,6 +30,7 @@ from mailman_pgp.workflows.subscription import (
ModerationSubscriptionPolicy, OpenSubscriptionPolicy)
+@public
class TestSubscriptionWorkflows(unittest.TestCase):
layer = PGPConfigLayer
--
cgit v1.2.3-70-g09d2
From 615a1e7c01a0710c5ba138d81358d80827bcf680 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Fri, 14 Jul 2017 02:01:33 +0200
Subject: Some more Key change workflow tests.
---
src/mailman_pgp/workflows/key_change.py | 20 ++--
src/mailman_pgp/workflows/tests/test_key_change.py | 105 +++++++++++++++++++++
2 files changed, 118 insertions(+), 7 deletions(-)
create mode 100644 src/mailman_pgp/workflows/tests/test_key_change.py
diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py
index a67edbb..9e204cf 100644
--- a/src/mailman_pgp/workflows/key_change.py
+++ b/src/mailman_pgp/workflows/key_change.py
@@ -18,6 +18,7 @@
""""""
from mailman.email.message import UserNotification
from mailman.interfaces.pending import IPendable, IPendings
+from mailman.interfaces.subscriptions import TokenOwner
from mailman.interfaces.workflows import IWorkflow
from mailman.workflows.base import Workflow
from pgpy import PGPKey
@@ -30,6 +31,7 @@ from mailman_pgp.model.address import PGPAddress
from mailman_pgp.pgp.utils import copy_headers
from mailman_pgp.pgp.wrapper import PGPWrapper
+
CHANGE_CONFIRM_REQUEST = """\
----------
TODO: this is a pgp enabled list.
@@ -48,7 +50,7 @@ Token: {}
class KeyChangeWorkflow(Workflow):
name = 'pgp-key-change-workflow'
description = ''
- initial_state = 'send_key_confirm_request'
+ initial_state = 'change_key'
save_attributes = (
'address_key',
'pubkey_key'
@@ -67,19 +69,21 @@ class KeyChangeWorkflow(Workflow):
@address_key.setter
def address_key(self, value):
self.pgp_address = PGPAddress.for_email(value)
+ self.member = self.mlist.regular_members.get_member(value)
@property
def pubkey_key(self):
- if self.pubkey is None:
- return None
return str(self.pubkey)
@pubkey_key.setter
def pubkey_key(self, value):
- if value is not None:
- self.pubkey, _ = PGPKey.from_blob(value)
- else:
- self.pubkey = None
+ self.pubkey, _ = PGPKey.from_blob(value)
+
+ def _step_change_key(self):
+ if self.pgp_address is None or self.pubkey is None:
+ raise ValueError
+
+ self.push('send_key_confirm_request')
def _step_send_key_confirm_request(self):
pendings = getUtility(IPendings)
@@ -89,6 +93,7 @@ class KeyChangeWorkflow(Workflow):
fingerprint=self.pubkey.fingerprint
)
self.token = pendings.add(pendable)
+ self.token_owner = TokenOwner.subscriber
self.push('receive_confirmation')
self.save()
@@ -116,6 +121,7 @@ class KeyChangeWorkflow(Workflow):
if self.token is not None:
pendings.confirm(self.token)
self.token = None
+ self.token_owner = TokenOwner.no_one
@classmethod
def pendable_class(cls):
diff --git a/src/mailman_pgp/workflows/tests/test_key_change.py b/src/mailman_pgp/workflows/tests/test_key_change.py
new file mode 100644
index 0000000..5d61efd
--- /dev/null
+++ b/src/mailman_pgp/workflows/tests/test_key_change.py
@@ -0,0 +1,105 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see .
+
+""""""
+
+import unittest
+
+from mailman.app.lifecycle import create_list
+from mailman.interfaces.subscriptions import ISubscriptionManager, TokenOwner
+from mailman.interfaces.usermanager import IUserManager
+from mailman.testing.helpers import get_queue_messages
+from public import public
+from zope.component import getUtility
+
+from mailman_pgp.database import mm_transaction, transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.tests.base import load_key
+from mailman_pgp.pgp.wrapper import PGPWrapper
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.workflows.key_change import KeyChangeWorkflow
+
+
+@public
+class TestKeyChangeWorkflow(unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ with mm_transaction():
+ self.mlist = create_list('test@example.com',
+ style_name='pgp-default')
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = load_key('ecc_p256.priv.asc')
+
+ self.sender_key = load_key('rsa_1024.priv.asc')
+ self.sender_new_key = load_key('ecc_p256.priv.asc')
+ self.sender = getUtility(IUserManager).create_address(
+ 'rsa-1024b@example.org')
+
+ def test_pgp_address_none(self):
+ workflow = KeyChangeWorkflow(self.mlist)
+ with self.assertRaises(ValueError):
+ list(workflow)
+
+ def test_pubkey_none(self):
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ t.add(pgp_address)
+
+ workflow = KeyChangeWorkflow(self.mlist, pgp_address)
+ with self.assertRaises(ValueError):
+ list(workflow)
+
+ def test_send_key_confirm_request(self):
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ workflow = KeyChangeWorkflow(self.mlist, pgp_address,
+ self.sender_new_key.pubkey)
+ list(workflow)
+ items = get_queue_messages('virgin', expected_count=1)
+ message = items[0].msg
+ token = workflow.token
+
+ self.assertEqual(message['Subject'], 'key confirm {}'.format(token))
+ wrapped = PGPWrapper(message)
+ self.assertTrue(wrapped.is_encrypted())
+
+ def test_confirm(self):
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ workflow = KeyChangeWorkflow(self.mlist, pgp_address,
+ self.sender_new_key.pubkey)
+ list(workflow)
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).confirm(
+ workflow.token)
+ self.assertIsNone(token)
+ self.assertEqual(token_owner, TokenOwner.no_one)
+
+ pgp_address = PGPAddress.for_address(self.sender)
+ self.assertEqual(pgp_address.key_fingerprint,
+ self.sender_new_key.fingerprint)
+ self.assertTrue(pgp_address.key_confirmed)
--
cgit v1.2.3-70-g09d2
From 6b7b6bfe1044a5c7a548a091a49dcdd2422aaff5 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Fri, 14 Jul 2017 02:10:31 +0200
Subject: Speed up testing. Dont drop the whole db structure.
---
src/mailman_pgp/testing/layers.py | 45 ++++++++++++++++++++++++++++++++-------
1 file changed, 37 insertions(+), 8 deletions(-)
diff --git a/src/mailman_pgp/testing/layers.py b/src/mailman_pgp/testing/layers.py
index aabf216..fb8a3ec 100644
--- a/src/mailman_pgp/testing/layers.py
+++ b/src/mailman_pgp/testing/layers.py
@@ -14,6 +14,7 @@
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see .
+import contextlib
import os
from os.path import isfile
@@ -24,18 +25,36 @@ from mailman_pgp.database import transaction
from mailman_pgp.model.base import Base
-def reset_pgp_world():
+def reset_rollback():
+ config.db.session.rollback()
+
+
+def reset_pgp_dirs():
for keydir in (config.pgp.keydir_config.values()):
for path in os.listdir(keydir):
full_path = os.path.join(keydir, path)
if isfile(full_path):
os.remove(full_path)
- config.db.session.rollback()
+
+
+def reset_pgp_hard():
+ reset_rollback()
+ reset_pgp_dirs()
with transaction():
Base.metadata.drop_all(config.db.engine)
Base.metadata.create_all(config.db.engine)
+def reset_pgp_soft():
+ reset_rollback()
+ reset_pgp_dirs()
+ with contextlib.closing(config.db.engine.connect()) as con:
+ trans = con.begin()
+ for table in reversed(Base.metadata.sorted_tables):
+ con.execute(table.delete())
+ trans.commit()
+
+
# It's weird that ws have to do this, but for some reason nose2 test layers
# don't work when ws create a mixin class with the two classmethods
# and subclass both it and the respective Mailman Core test layer.
@@ -46,28 +65,38 @@ class PGPConfigLayer(ConfigLayer):
@classmethod
def tearDown(cls):
- reset_pgp_world()
+ reset_pgp_soft()
+
+ @classmethod
+ def testTearDown(cls):
+ reset_pgp_soft()
+
+
+class PGPMigrationLayer(ConfigLayer):
+ @classmethod
+ def tearDown(cls):
+ reset_pgp_hard()
@classmethod
def testTearDown(cls):
- reset_pgp_world()
+ reset_pgp_hard()
class PGPSMTPLayer(SMTPLayer):
@classmethod
def tearDown(cls):
- reset_pgp_world()
+ reset_pgp_soft()
@classmethod
def testTearDown(cls):
- reset_pgp_world()
+ reset_pgp_soft()
class PGPRESTLayer(RESTLayer):
@classmethod
def tearDown(cls):
- reset_pgp_world()
+ reset_pgp_soft()
@classmethod
def testTearDown(cls):
- reset_pgp_world()
+ reset_pgp_soft()
--
cgit v1.2.3-70-g09d2
From 0b9335e163791959390bf3c83928e5b61e912fa3 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Fri, 14 Jul 2017 02:16:23 +0200
Subject: qa.
---
src/mailman_pgp/workflows/key_change.py | 3 +--
src/mailman_pgp/workflows/tests/test_base.py | 4 ++--
2 files changed, 3 insertions(+), 4 deletions(-)
diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py
index 9e204cf..c6d3ebc 100644
--- a/src/mailman_pgp/workflows/key_change.py
+++ b/src/mailman_pgp/workflows/key_change.py
@@ -31,7 +31,6 @@ from mailman_pgp.model.address import PGPAddress
from mailman_pgp.pgp.utils import copy_headers
from mailman_pgp.pgp.wrapper import PGPWrapper
-
CHANGE_CONFIRM_REQUEST = """\
----------
TODO: this is a pgp enabled list.
@@ -80,7 +79,7 @@ class KeyChangeWorkflow(Workflow):
self.pubkey, _ = PGPKey.from_blob(value)
def _step_change_key(self):
- if self.pgp_address is None or self.pubkey is None:
+ if self.pgp_address is None or self.pubkey is None:
raise ValueError
self.push('send_key_confirm_request')
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
index c59cfe0..e3cc833 100644
--- a/src/mailman_pgp/workflows/tests/test_base.py
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -37,8 +37,8 @@ from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.tests.base import load_key
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.testing.layers import PGPConfigLayer
-from mailman_pgp.workflows.base import (KEY_REQUEST, PGPMixin, SetPubkeyMixin,
- ConfirmPubkeyMixin)
+from mailman_pgp.workflows.base import (ConfirmPubkeyMixin, KEY_REQUEST,
+ PGPMixin, SetPubkeyMixin)
class PubkeyMixinTestSetup():
--
cgit v1.2.3-70-g09d2
From 9f0ac2239af18f780c757f8cf6524c99de2dffe8 Mon Sep 17 00:00:00 2001
From: J08nY
Date: Fri, 14 Jul 2017 15:13:47 +0200
Subject: More tests for key command.
---
src/mailman_pgp/commands/eml_key.py | 4 +
src/mailman_pgp/commands/tests/test_key.py | 295 +++++++++++++++++++++++++++--
src/mailman_pgp/workflows/base.py | 3 +-
src/mailman_pgp/workflows/key_change.py | 4 +-
4 files changed, 288 insertions(+), 18 deletions(-)
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index 2f7a7e7..1b6dc9f 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -109,6 +109,10 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
print('A pgp enabled address not found.', file=results)
return ContinueProcessing.no
+ if pgp_address.key is None:
+ print('No key set.', file=results)
+ return ContinueProcessing.no
+
wrapped = PGPWrapper(msg)
if wrapped.is_encrypted():
decrypted = wrapped.decrypt(pgp_list.key)
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
index 44d5b25..d0ff7e9 100644
--- a/src/mailman_pgp/commands/tests/test_key.py
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -157,6 +157,49 @@ class TestPreSubscription(unittest.TestCase):
confirm_wrapped = PGPWrapper(confirm_request)
self.assertTrue(confirm_wrapped.is_encrypted())
+ def test_set_encrypted(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart)
+
+ get_queue_messages('virgin')
+
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'Re: key set {}'.format(token))
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.encrypt(self.pgp_list.pubkey,
+ self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ pgp_address = PGPAddress.for_address(bart)
+ self.assertIsNotNone(pgp_address)
+ self.assertEqual(pgp_address.key.fingerprint,
+ self.bart_key.fingerprint)
+ self.assertFalse(pgp_address.key_confirmed)
+
+ items = get_queue_messages('virgin', expected_count=2)
+ if items[0].msg['Subject'] == 'The results of your email commands':
+ results = items[0].msg
+ confirm_request = items[1].msg
+ else:
+ results = items[1].msg
+ confirm_request = items[0].msg
+
+ self.assertIn('Key succesfully set.', results.get_payload())
+ self.assertIn('Key fingerprint: {}'.format(self.bart_key.fingerprint),
+ results.get_payload())
+
+ confirm_wrapped = PGPWrapper(confirm_request)
+ self.assertTrue(confirm_wrapped.is_encrypted())
+
def test_set_no_token(self):
message = _create_plain('bart@example.com', 'test@example.com',
'key set', '')
@@ -227,6 +270,47 @@ class TestPreSubscription(unittest.TestCase):
self.assertIn('No adddress to subscribe with.',
results_msg.get_payload())
+ def test_set_no_pgp_address(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key set token')
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('A pgp enabled address not found.',
+ results_msg.get_payload())
+
+ def test_set_wrong_token(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ t.add(pgp_address)
+
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key set token')
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Wrong token.', results_msg.get_payload())
+
def test_confirm(self):
self.mlist.subscription_policy = OpenSubscriptionPolicy
bart = getUtility(IUserManager).create_address('bart@example.com',
@@ -321,6 +405,29 @@ class TestPreSubscription(unittest.TestCase):
self.assertIn('A pgp enabled address not found.',
results_msg.get_payload())
+ def test_confirm_no_key(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ t.add(pgp_address)
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm token',
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ 'token'))
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign(self.bart_key)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No key set.', results_msg.get_payload())
+
def test_confirm_not_signed(self):
self.mlist.subscription_policy = OpenSubscriptionPolicy
bart = getUtility(IUserManager).create_address('bart@example.com',
@@ -348,6 +455,92 @@ class TestPreSubscription(unittest.TestCase):
self.assertIn('Message not signed, ignoring.',
results_msg.get_payload())
+ def test_confirm_invalid_sig(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart, pubkey=self.bart_key.pubkey)
+
+ get_queue_messages('virgin')
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm {}'.format(token),
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ token))
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign(self.bart_key)
+ message.get_payload(0).set_payload(
+ 'Something that was definitely not signed.')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Message failed to verify.',
+ results_msg.get_payload())
+
+ def test_confirm_wrong_token(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ t.add(pgp_address)
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm token',
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ 'token'))
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign(self.bart_key)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Wrong token.', results_msg.get_payload())
+
+ def test_confirm_no_signed_statement(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart, pubkey=self.bart_key.pubkey)
+
+ get_queue_messages('virgin')
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm {}'.format(token),
+ 'Some text, that definitely does not'
+ 'contain the required/expected statement.')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign(self.bart_key)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn("Message doesn't contain the expected statement.",
+ results_msg.get_payload())
+
@public
class TestAfterSubscription(unittest.TestCase):
@@ -358,22 +551,23 @@ class TestAfterSubscription(unittest.TestCase):
self.pgp_list = PGPMailingList.for_list(self.mlist)
self.pgp_list.key = load_key('ecc_p256.priv.asc')
- def test_key_change(self):
+ self.bart_key = load_key('rsa_1024.priv.asc')
+ self.bart_new_key = load_key('ecc_p256.priv.asc')
+
+ def test_change(self):
bart = getUtility(IUserManager).create_address('bart@example.com',
'Bart Person')
- bart_key = load_key('rsa_1024.priv.asc')
- bart_new_key = load_key('ecc_p256.priv.asc')
with transaction() as t:
pgp_address = PGPAddress(bart)
- pgp_address.key = bart_key.pubkey
+ pgp_address.key = self.bart_key.pubkey
pgp_address.key_confirmed = True
t.add(pgp_address)
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_key(bart_new_key.pubkey)
+ message = wrapped_message.attach_key(self.bart_new_key.pubkey)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -391,25 +585,23 @@ class TestAfterSubscription(unittest.TestCase):
confirm_wrapped = PGPWrapper(confirm_request)
self.assertTrue(confirm_wrapped.is_encrypted())
- decrypted = confirm_wrapped.decrypt(bart_new_key)
+ decrypted = confirm_wrapped.decrypt(self.bart_new_key)
self.assertIn('key confirm', decrypted['subject'])
- def test_key_change_confirm(self):
+ def test_change_confirm(self):
bart = getUtility(IUserManager).create_address('bart@example.com',
'Bart Person')
- bart_key = load_key('rsa_1024.priv.asc')
- bart_new_key = load_key('ecc_p256.priv.asc')
with transaction() as t:
pgp_address = PGPAddress(bart)
- pgp_address.key = bart_key.pubkey
+ pgp_address.key = self.bart_key.pubkey
pgp_address.key_confirmed = True
t.add(pgp_address)
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_key(bart_new_key.pubkey)
+ message = wrapped_message.attach_key(self.bart_new_key.pubkey)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -421,7 +613,7 @@ class TestAfterSubscription(unittest.TestCase):
else:
confirm_request = items[0].msg
request_wrapped = PGPWrapper(confirm_request)
- decrypted = request_wrapped.decrypt(bart_new_key)
+ decrypted = request_wrapped.decrypt(self.bart_new_key)
subj = decrypted['subject']
token = subj.split(' ')[-1]
@@ -429,15 +621,88 @@ class TestAfterSubscription(unittest.TestCase):
confirm_message = _create_plain('bart@example.com', 'test@example.com',
decrypted['subject'],
CHANGE_CONFIRM_REQUEST.format(
- bart_new_key.fingerprint,
+ self.bart_new_key.fingerprint,
token))
wrapped_confirm = MIMEWrapper(confirm_message)
- confirm = wrapped_confirm.sign(bart_key)
+ confirm = wrapped_confirm.sign(self.bart_key)
mm_config.switchboards['command'].enqueue(confirm,
listid='test.example.com')
make_testable_runner(CommandRunner, 'command').run()
pgp_address = PGPAddress.for_address(bart)
- self.assertEqual(pgp_address.key_fingerprint, bart_new_key.fingerprint)
+ self.assertEqual(pgp_address.key_fingerprint,
+ self.bart_new_key.fingerprint)
self.assertTrue(pgp_address.key_confirmed)
+
+ def test_change_extra_arg(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key change extra arguments', '')
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Extraneous argument/s: extra,arguments',
+ results_msg.get_payload())
+
+ def test_change_no_key(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key change', '')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No keys attached? Send a key.',
+ results_msg.get_payload())
+
+ def test_change_multiple_keys(self):
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key change')
+
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_new_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('More than one key! Send only one key.',
+ results_msg.get_payload())
+
+ def test_change_no_email(self):
+ message = _create_mixed('', 'test@example.com', 'key change')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_key(self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No email to change key of.', results_msg.get_payload())
+
+ def test_change_no_pgp_address(self):
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key change')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_key(self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('A pgp enabled address not found.',
+ results_msg.get_payload())
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index 014f2dd..d35e58c 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -130,8 +130,7 @@ class ConfirmPubkeyMixin:
self.token))
pgp_list = PGPMailingList.for_list(self.mlist)
wrapped = PGPWrapper(msg)
- encrypted = wrapped.sign_encrypt(pgp_list.key, pgp_address.key,
- pgp_list.pubkey)
+ encrypted = wrapped.sign_encrypt(pgp_list.key, pgp_address.key)
msg.set_payload(encrypted.get_payload())
copy_headers(encrypted, msg, True)
diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py
index c6d3ebc..cc5b9fc 100644
--- a/src/mailman_pgp/workflows/key_change.py
+++ b/src/mailman_pgp/workflows/key_change.py
@@ -28,6 +28,7 @@ from zope.interface import implementer
from mailman_pgp.database import transaction
from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.utils import copy_headers
from mailman_pgp.pgp.wrapper import PGPWrapper
@@ -58,6 +59,7 @@ class KeyChangeWorkflow(Workflow):
def __init__(self, mlist, pgp_address=None, pubkey=None):
super().__init__()
self.mlist = mlist
+ self.pgp_list = PGPMailingList.for_list(mlist)
self.pgp_address = pgp_address
self.pubkey = pubkey
@@ -104,7 +106,7 @@ class KeyChangeWorkflow(Workflow):
self.pubkey.fingerprint,
self.token))
wrapped = PGPWrapper(msg)
- encrypted = wrapped.encrypt(self.pubkey)
+ encrypted = wrapped.sign_encrypt(self.pgp_list.key, self.pubkey)
msg.set_payload(encrypted.get_payload())
copy_headers(encrypted, msg, True)
--
cgit v1.2.3-70-g09d2
From 59ddda945ecf7de859f7976c0cea32dc6434b98e Mon Sep 17 00:00:00 2001
From: J08nY
Date: Fri, 14 Jul 2017 21:35:01 +0200
Subject: More tests for PGP message wrappers and fix inline decrypt.
---
src/mailman_pgp/pgp/inline.py | 5 ++-
src/mailman_pgp/pgp/tests/base.py | 58 +++++++++++++++++++++----------
src/mailman_pgp/pgp/tests/test_inline.py | 46 ++++++++++++++++++------
src/mailman_pgp/pgp/tests/test_keygen.py | 2 ++
src/mailman_pgp/pgp/tests/test_mime.py | 38 +++++++++++++-------
src/mailman_pgp/pgp/tests/test_wrapper.py | 44 +++++++++++++++++------
6 files changed, 140 insertions(+), 53 deletions(-)
diff --git a/src/mailman_pgp/pgp/inline.py b/src/mailman_pgp/pgp/inline.py
index 9c19030..458cc26 100644
--- a/src/mailman_pgp/pgp/inline.py
+++ b/src/mailman_pgp/pgp/inline.py
@@ -201,7 +201,10 @@ class InlineWrapper:
def _decrypt(self, part, key):
message = PGPMessage.from_blob(part.get_payload())
decrypted = key.decrypt(message)
- part.set_payload(decrypted.message)
+ if decrypted.is_signed:
+ part.set_payload(str(decrypted))
+ else:
+ part.set_payload(decrypted.message)
def decrypt(self, key):
"""
diff --git a/src/mailman_pgp/pgp/tests/base.py b/src/mailman_pgp/pgp/tests/base.py
index d561fda..52e5ec4 100644
--- a/src/mailman_pgp/pgp/tests/base.py
+++ b/src/mailman_pgp/pgp/tests/base.py
@@ -42,63 +42,66 @@ def load_key(path):
class WrapperTestCase(TestCase):
wrapper = None
+ def wrap(self, message):
+ return self.wrapper(message)
+
def is_signed(self, message, signed):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
self.assertEqual(wrapped.is_signed(), signed)
def sign(self, message, key):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
signed = wrapped.sign(key)
- signed_wrapped = self.wrapper(signed)
+ signed_wrapped = self.wrap(signed)
self.assertTrue(signed_wrapped.is_signed())
def sign_verify(self, message, priv, pub):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
signed = wrapped.sign(priv)
- signed_wrapped = self.wrapper(signed)
+ signed_wrapped = self.wrap(signed)
for signature in signed_wrapped.verify(pub):
self.assertTrue(bool(signature))
def verify(self, message, key, valid):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
for signature in wrapped.verify(key):
self.assertEqual(bool(signature), valid)
def is_encrypted(self, message, encrypted):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
self.assertEqual(wrapped.is_encrypted(), encrypted)
def encrypt(self, message, *keys, **kwargs):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
encrypted = wrapped.encrypt(*keys, **kwargs)
- encrypted_wrapped = self.wrapper(encrypted)
+ encrypted_wrapped = self.wrap(encrypted)
self.assertTrue(encrypted_wrapped.is_encrypted())
def encrypt_decrypt(self, message, pub, priv):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
encrypted = wrapped.encrypt(pub)
- encrypted_wrapped = self.wrapper(encrypted)
+ encrypted_wrapped = self.wrap(encrypted)
decrypted = encrypted_wrapped.decrypt(priv)
- decrypted_wrapped = self.wrapper(decrypted)
+ decrypted_wrapped = self.wrap(decrypted)
self.assertFalse(decrypted_wrapped.is_encrypted())
self.assertEqual(decrypted.get_payload(), message.get_payload())
def decrypt(self, message, key, clear):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
decrypted = wrapped.decrypt(key)
- decrypted_wrapped = self.wrapper(decrypted)
+ decrypted_wrapped = self.wrap(decrypted)
self.assertFalse(decrypted_wrapped.is_encrypted())
self.assertEqual(decrypted.get_payload(), clear)
def has_keys(self, message, has_keys):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
self.assertEqual(wrapped.has_keys(), has_keys)
def keys(self, message, keys):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
loaded = list(wrapped.keys())
self.assertEqual(len(loaded), len(keys))
@@ -107,13 +110,30 @@ class WrapperTestCase(TestCase):
self.assertListEqual(loaded_fingerprints, fingerprints)
def sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
encrypted = wrapped.sign_encrypt(sign_key, encrypt_key.pubkey)
- encrypted_wrapped = self.wrapper(encrypted)
+ encrypted_wrapped = self.wrap(encrypted)
+ self.assertTrue(encrypted_wrapped.is_encrypted())
+
+ decrypted = encrypted_wrapped.decrypt(encrypt_key)
+ decrypted_wrapped = self.wrap(decrypted)
+ self.assertTrue(decrypted_wrapped.is_signed())
+ self.assertFalse(decrypted_wrapped.is_encrypted())
+
+ verification = decrypted_wrapped.verify(sign_key.pubkey)
+ for sig in verification:
+ self.assertTrue(bool(sig))
+ self.assertListEqual(list(decrypted_wrapped.get_signed()),
+ list(wrapped.get_payload()))
+
+ def sign_then_encrypt_decrypt_verify(self, message, sign_key, encrypt_key):
+ wrapped = self.wrap(message)
+ encrypted = wrapped.sign_then_encrypt(sign_key, encrypt_key.pubkey)
+ encrypted_wrapped = self.wrap(encrypted)
self.assertTrue(encrypted_wrapped.is_encrypted())
decrypted = encrypted_wrapped.decrypt(encrypt_key)
- decrypted_wrapped = self.wrapper(decrypted)
+ decrypted_wrapped = self.wrap(decrypted)
self.assertTrue(decrypted_wrapped.is_signed())
self.assertFalse(decrypted_wrapped.is_encrypted())
diff --git a/src/mailman_pgp/pgp/tests/test_inline.py b/src/mailman_pgp/pgp/tests/test_inline.py
index cd18209..a89963a 100644
--- a/src/mailman_pgp/pgp/tests/test_inline.py
+++ b/src/mailman_pgp/pgp/tests/test_inline.py
@@ -18,6 +18,7 @@
"""Tests for the inline wrapper."""
from parameterized import parameterized
+from public import public
from mailman_pgp.pgp.inline import InlineWrapper
from mailman_pgp.pgp.tests.base import load_key, load_message, WrapperTestCase
@@ -27,6 +28,7 @@ class InlineWrapperTestCase(WrapperTestCase):
wrapper = InlineWrapper
+@public
class TestSigning(InlineWrapperTestCase):
@parameterized.expand([
(load_message('inline_cleartext_signed.eml'),
@@ -43,7 +45,7 @@ class TestSigning(InlineWrapperTestCase):
False)
])
def test_is_signed(self, message, signed):
- super().is_signed(message, signed)
+ self.is_signed(message, signed)
@parameterized.expand([
(load_message('clear.eml'),
@@ -52,7 +54,7 @@ class TestSigning(InlineWrapperTestCase):
load_key('ecc_p256.priv.asc'))
])
def test_sign(self, message, key):
- super().sign(message, key)
+ self.sign(message, key)
@parameterized.expand([
(load_message('clear.eml'),
@@ -63,7 +65,7 @@ class TestSigning(InlineWrapperTestCase):
load_key('ecc_p256.pub.asc'))
])
def test_sign_verify(self, message, priv, pub):
- super().sign_verify(message, priv, pub)
+ self.sign_verify(message, priv, pub)
@parameterized.expand([
(load_message('inline_cleartext_signed.eml'),
@@ -80,9 +82,10 @@ class TestSigning(InlineWrapperTestCase):
False),
])
def test_verify(self, message, key, valid):
- super().verify(message, key, valid)
+ self.verify(message, key, valid)
+@public
class TestEncryption(InlineWrapperTestCase):
@parameterized.expand([
(load_message('inline_encrypted.eml'),
@@ -99,7 +102,7 @@ class TestEncryption(InlineWrapperTestCase):
False)
])
def test_is_encrypted(self, message, encrypted):
- super().is_encrypted(message, encrypted)
+ self.is_encrypted(message, encrypted)
@parameterized.expand([
(load_message('clear.eml'),
@@ -110,9 +113,9 @@ class TestEncryption(InlineWrapperTestCase):
])
def test_encrypt(self, message, keys, **kwargs):
if isinstance(keys, tuple):
- super().encrypt(message, *keys, **kwargs)
+ self.encrypt(message, *keys, **kwargs)
else:
- super().encrypt(message, keys, **kwargs)
+ self.encrypt(message, keys, **kwargs)
@parameterized.expand([
(load_message('clear.eml'),
@@ -123,7 +126,7 @@ class TestEncryption(InlineWrapperTestCase):
load_key('ecc_p256.priv.asc'))
])
def test_encrypt_decrypt(self, message, pub, priv):
- super().encrypt_decrypt(message, pub, priv)
+ self.encrypt_decrypt(message, pub, priv)
@parameterized.expand([
(load_message('inline_encrypted.eml'),
@@ -131,9 +134,10 @@ class TestEncryption(InlineWrapperTestCase):
'Some encrypted text.\n\n')
])
def test_decrypt(self, message, key, clear):
- super().decrypt(message, key, clear)
+ self.decrypt(message, key, clear)
+@public
class TestKeys(InlineWrapperTestCase):
@parameterized.expand([
(load_message('inline_privkey.eml'),
@@ -146,7 +150,7 @@ class TestKeys(InlineWrapperTestCase):
False)
])
def test_has_keys(self, message, has_keys):
- super().has_keys(message, has_keys)
+ self.has_keys(message, has_keys)
@parameterized.expand([
(load_message('inline_privkey.eml'),
@@ -155,4 +159,24 @@ class TestKeys(InlineWrapperTestCase):
[load_key('rsa_1024.pub.asc')])
])
def test_keys(self, message, keys):
- super().keys(message, keys)
+ self.keys(message, keys)
+
+
+@public
+class TestCombined(InlineWrapperTestCase):
+ @parameterized.expand([
+ (load_message('clear.eml'),
+ load_key('rsa_1024.priv.asc'),
+ load_key('ecc_p256.priv.asc'))
+ ])
+ def test_sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key):
+ self.sign_encrypt_decrypt_verify(message, sign_key, encrypt_key)
+
+ @parameterized.expand([
+ (load_message('clear.eml'),
+ load_key('rsa_1024.priv.asc'),
+ load_key('ecc_p256.priv.asc'))
+ ])
+ def test_sign_then_encrypt_decrypt_verify(self, message, sign_key,
+ encrypt_key):
+ self.sign_then_encrypt_decrypt_verify(message, sign_key, encrypt_key)
diff --git a/src/mailman_pgp/pgp/tests/test_keygen.py b/src/mailman_pgp/pgp/tests/test_keygen.py
index dab6801..5c59614 100644
--- a/src/mailman_pgp/pgp/tests/test_keygen.py
+++ b/src/mailman_pgp/pgp/tests/test_keygen.py
@@ -21,10 +21,12 @@ from unittest import TestCase
from pgpy import PGPKey
from pgpy.constants import PubKeyAlgorithm
+from public import public
from mailman_pgp.pgp.keygen import ListKeyGenerator
+@public
class TesKeygen(TestCase):
def setUp(self):
self.keypair_config = {
diff --git a/src/mailman_pgp/pgp/tests/test_mime.py b/src/mailman_pgp/pgp/tests/test_mime.py
index fab50bb..f56c781 100644
--- a/src/mailman_pgp/pgp/tests/test_mime.py
+++ b/src/mailman_pgp/pgp/tests/test_mime.py
@@ -18,6 +18,7 @@
"""Tests for the MIME wrapper."""
from parameterized import parameterized
+from public import public
from mailman_pgp.pgp.mime import MIMEWrapper
from mailman_pgp.pgp.tests.base import load_key, load_message, WrapperTestCase
@@ -27,6 +28,7 @@ class MIMEWrapperTestCase(WrapperTestCase):
wrapper = MIMEWrapper
+@public
class TestSigning(MIMEWrapperTestCase):
@parameterized.expand([
(load_message('mime_signed.eml'),
@@ -37,7 +39,7 @@ class TestSigning(MIMEWrapperTestCase):
False)
])
def test_is_signed(self, message, signed):
- super().is_signed(message, signed)
+ self.is_signed(message, signed)
@parameterized.expand([
(load_message('clear.eml'),
@@ -46,7 +48,7 @@ class TestSigning(MIMEWrapperTestCase):
load_key('ecc_p256.priv.asc'))
])
def test_sign(self, message, key):
- super().sign(message, key)
+ self.sign(message, key)
@parameterized.expand([
(load_message('clear.eml'),
@@ -57,7 +59,7 @@ class TestSigning(MIMEWrapperTestCase):
load_key('ecc_p256.pub.asc'))
])
def test_sign_verify(self, message, priv, pub):
- super().sign_verify(message, priv, pub)
+ self.sign_verify(message, priv, pub)
@parameterized.expand([
(load_message('mime_signed.eml'),
@@ -68,9 +70,10 @@ class TestSigning(MIMEWrapperTestCase):
False)
])
def test_verify(self, message, key, valid):
- super().verify(message, key, valid)
+ self.verify(message, key, valid)
+@public
class TestEncryption(MIMEWrapperTestCase):
@parameterized.expand([
(load_message('mime_encrypted.eml'),
@@ -79,7 +82,7 @@ class TestEncryption(MIMEWrapperTestCase):
True)
])
def test_is_encrypted(self, message, encrypted):
- super().is_encrypted(message, encrypted)
+ self.is_encrypted(message, encrypted)
@parameterized.expand([
(load_message('clear.eml'),
@@ -90,9 +93,9 @@ class TestEncryption(MIMEWrapperTestCase):
])
def test_encrypt(self, message, keys, **kwargs):
if isinstance(keys, tuple):
- super().encrypt(message, *keys, **kwargs)
+ self.encrypt(message, *keys, **kwargs)
else:
- super().encrypt(message, keys, **kwargs)
+ self.encrypt(message, keys, **kwargs)
@parameterized.expand([
(load_message('clear.eml'),
@@ -103,7 +106,7 @@ class TestEncryption(MIMEWrapperTestCase):
load_key('ecc_p256.priv.asc'))
])
def test_encrypt_decrypt(self, message, pub, priv):
- super().encrypt_decrypt(message, pub, priv)
+ self.encrypt_decrypt(message, pub, priv)
@parameterized.expand([
(load_message('mime_encrypted.eml'),
@@ -111,9 +114,10 @@ class TestEncryption(MIMEWrapperTestCase):
'Some encrypted text.\n')
])
def test_decrypt(self, message, key, clear):
- super().decrypt(message, key, clear)
+ self.decrypt(message, key, clear)
+@public
class TestKeys(MIMEWrapperTestCase):
@parameterized.expand([
(load_message('mime_privkey.eml'),
@@ -126,7 +130,7 @@ class TestKeys(MIMEWrapperTestCase):
False)
])
def test_has_keys(self, message, has_keys):
- super().has_keys(message, has_keys)
+ self.has_keys(message, has_keys)
@parameterized.expand([
(load_message('mime_privkey.eml'),
@@ -135,9 +139,10 @@ class TestKeys(MIMEWrapperTestCase):
[load_key('rsa_1024.pub.asc')])
])
def test_keys(self, message, keys):
- super().keys(message, keys)
+ self.keys(message, keys)
+@public
class TestCombined(MIMEWrapperTestCase):
@parameterized.expand([
(load_message('clear.eml'),
@@ -145,4 +150,13 @@ class TestCombined(MIMEWrapperTestCase):
load_key('ecc_p256.priv.asc'))
])
def test_sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key):
- super().sign_encrypt_decrypt_verify(message, sign_key, encrypt_key)
+ self.sign_encrypt_decrypt_verify(message, sign_key, encrypt_key)
+
+ @parameterized.expand([
+ (load_message('clear.eml'),
+ load_key('rsa_1024.priv.asc'),
+ load_key('ecc_p256.priv.asc'))
+ ])
+ def test_sign_then_encrypt_decrypt_verify(self, message, sign_key,
+ encrypt_key):
+ self.sign_then_encrypt_decrypt_verify(message, sign_key, encrypt_key)
diff --git a/src/mailman_pgp/pgp/tests/test_wrapper.py b/src/mailman_pgp/pgp/tests/test_wrapper.py
index fb7f0bb..14f40fc 100644
--- a/src/mailman_pgp/pgp/tests/test_wrapper.py
+++ b/src/mailman_pgp/pgp/tests/test_wrapper.py
@@ -17,6 +17,7 @@
"""Tests for the combined wrapper."""
from parameterized import parameterized
+from public import public
from mailman_pgp.pgp.tests.base import load_key, load_message, WrapperTestCase
from mailman_pgp.pgp.wrapper import PGPWrapper
@@ -26,6 +27,7 @@ class PGPWrapperTestCase(WrapperTestCase):
wrapper = PGPWrapper
+@public
class TestSigning(PGPWrapperTestCase):
@parameterized.expand([
(load_message('inline_cleartext_signed.eml'),
@@ -46,7 +48,7 @@ class TestSigning(PGPWrapperTestCase):
False)
])
def test_is_signed(self, message, signed):
- super().is_signed(message, signed)
+ self.is_signed(message, signed)
@parameterized.expand([
(load_message('clear.eml'),
@@ -55,7 +57,7 @@ class TestSigning(PGPWrapperTestCase):
load_key('ecc_p256.priv.asc'))
])
def test_sign(self, message, key):
- super().sign(message, key)
+ self.sign(message, key)
@parameterized.expand([
(load_message('inline_cleartext_signed.eml'),
@@ -72,9 +74,10 @@ class TestSigning(PGPWrapperTestCase):
False)
])
def test_verify(self, message, key, valid):
- super().verify(message, key, valid)
+ self.verify(message, key, valid)
+@public
class TestEncryption(PGPWrapperTestCase):
@parameterized.expand([
(load_message('inline_encrypted.eml'),
@@ -93,7 +96,7 @@ class TestEncryption(PGPWrapperTestCase):
False)
])
def test_is_encrypted(self, message, encrypted):
- super().is_encrypted(message, encrypted)
+ self.is_encrypted(message, encrypted)
@parameterized.expand([
(load_message('clear.eml'),
@@ -104,9 +107,9 @@ class TestEncryption(PGPWrapperTestCase):
])
def test_encrypt(self, message, keys, **kwargs):
if isinstance(keys, tuple):
- super().encrypt(message, *keys, **kwargs)
+ self.encrypt(message, *keys, **kwargs)
else:
- super().encrypt(message, keys, **kwargs)
+ self.encrypt(message, keys, **kwargs)
@parameterized.expand([
(load_message('clear.eml'),
@@ -117,7 +120,7 @@ class TestEncryption(PGPWrapperTestCase):
load_key('ecc_p256.priv.asc'))
])
def test_encrypt_decrypt(self, message, pub, priv):
- super().encrypt_decrypt(message, pub, priv)
+ self.encrypt_decrypt(message, pub, priv)
@parameterized.expand([
(load_message('inline_encrypted.eml'),
@@ -125,9 +128,10 @@ class TestEncryption(PGPWrapperTestCase):
'Some encrypted text.\n\n')
])
def test_decrypt(self, message, key, clear):
- super().decrypt(message, key, clear)
+ self.decrypt(message, key, clear)
+@public
class TestKeys(PGPWrapperTestCase):
@parameterized.expand([
(load_message('inline_privkey.eml'),
@@ -146,7 +150,7 @@ class TestKeys(PGPWrapperTestCase):
False)
])
def test_has_keys(self, message, has_keys):
- super().has_keys(message, has_keys)
+ self.has_keys(message, has_keys)
@parameterized.expand([
(load_message('inline_privkey.eml'),
@@ -159,4 +163,24 @@ class TestKeys(PGPWrapperTestCase):
[load_key('rsa_1024.pub.asc')])
])
def test_keys(self, message, keys):
- super().keys(message, keys)
+ self.keys(message, keys)
+
+
+@public
+class TestCombined(PGPWrapperTestCase):
+ @parameterized.expand([
+ (load_message('clear.eml'),
+ load_key('rsa_1024.priv.asc'),
+ load_key('ecc_p256.priv.asc'))
+ ])
+ def test_sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key):
+ self.sign_encrypt_decrypt_verify(message, sign_key, encrypt_key)
+
+ @parameterized.expand([
+ (load_message('clear.eml'),
+ load_key('rsa_1024.priv.asc'),
+ load_key('ecc_p256.priv.asc'))
+ ])
+ def test_sign_then_encrypt_decrypt_verify(self, message, sign_key,
+ encrypt_key):
+ self.sign_then_encrypt_decrypt_verify(message, sign_key, encrypt_key)
--
cgit v1.2.3-70-g09d2