aboutsummaryrefslogtreecommitdiff
path: root/src/mailman_pgp/workflows
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman_pgp/workflows')
-rw-r--r--src/mailman_pgp/workflows/base.py70
-rw-r--r--src/mailman_pgp/workflows/key_change.py99
-rw-r--r--src/mailman_pgp/workflows/key_confirm.py77
-rw-r--r--src/mailman_pgp/workflows/key_revoke.py70
-rw-r--r--src/mailman_pgp/workflows/key_set.py76
-rw-r--r--src/mailman_pgp/workflows/mod_approval.py153
-rw-r--r--src/mailman_pgp/workflows/pubkey.py118
-rw-r--r--src/mailman_pgp/workflows/subscription.py68
-rw-r--r--src/mailman_pgp/workflows/tests/test_base.py23
-rw-r--r--src/mailman_pgp/workflows/tests/test_key_change.py31
-rw-r--r--src/mailman_pgp/workflows/tests/test_mod_approval.py106
11 files changed, 684 insertions, 207 deletions
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index 036dc18..9fc5d58 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -16,15 +16,75 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
""""""
+from datetime import timedelta
+
+from mailman.interfaces.pending import IPendings
+from mailman.interfaces.subscriptions import TokenOwner
+from mailman.utilities.datetime import now
+from mailman.utilities.modules import abstract_component
+from mailman.workflows.common import SubscriptionBase
+from public import public
+from zope.component import getUtility
from mailman_pgp.database import transaction
from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+@public
class PGPMixin:
- def _step_pgp_prepare(self):
- pgp_address = PGPAddress.for_address(self.address)
- if pgp_address is None:
+ def __init__(self, mlist, pgp_address=None):
+ self.mlist = mlist
+ self.pgp_list = PGPMailingList.for_list(mlist)
+ self.pgp_address = pgp_address
+ if self.pgp_address is not None:
+ self.address = self.pgp_address.address
+
+ @property
+ def address_key(self):
+ return self.pgp_address.email
+
+ @address_key.setter
+ def address_key(self, value):
+ self.pgp_address = PGPAddress.for_email(value)
+ self.member = self.mlist.regular_members.get_member(value)
+
+ def _step_create_address(self):
+ self.pgp_address = PGPAddress.for_address(self.address)
+ if self.pgp_address is None:
with transaction() as t:
- pgp_address = PGPAddress(self.address)
- t.add(pgp_address)
+ self.pgp_address = PGPAddress(self.address)
+ t.add(self.pgp_address)
+
+ def _step_restore_address(self):
+ self.pgp_address = PGPAddress.for_address(self.address)
+
+ def _set_token(self, token_owner):
+ assert isinstance(token_owner, TokenOwner)
+ pendings = getUtility(IPendings)
+ if self.token is not None:
+ pendings.confirm(self.token)
+ self.token_owner = token_owner
+ if token_owner is TokenOwner.no_one:
+ self.token = None
+ return
+
+ pendable = self.pendable_class()(
+ list_id=self.mlist.list_id,
+ email=self.address.email,
+ display_name=self.address.display_name,
+ when=now().replace(microsecond=0).isoformat(),
+ token_owner=token_owner.name,
+ )
+ self.token = pendings.add(pendable, timedelta(days=3650))
+
+
+@public
+@abstract_component
+class PGPSubscriptionBase(SubscriptionBase, PGPMixin):
+ def __init__(self, mlist, subscriber=None, *, pgp_address=None):
+ SubscriptionBase.__init__(self, mlist, subscriber)
+ PGPMixin.__init__(self, mlist, pgp_address=pgp_address)
+
+ def _step_restore_subscriber(self):
+ self._restore_subscriber()
diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py
index 290e504..1d07903 100644
--- a/src/mailman_pgp/workflows/key_change.py
+++ b/src/mailman_pgp/workflows/key_change.py
@@ -28,10 +28,11 @@ from zope.interface import implementer
from mailman_pgp.config import config
from mailman_pgp.database import transaction
-from mailman_pgp.model.address import PGPAddress
-from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.utils.email import copy_headers
+from mailman_pgp.workflows.base import PGPMixin
+from mailman_pgp.workflows.mod_approval import (
+ ModeratorKeyChangeApprovalMixin)
CHANGE_CONFIRM_REQUEST = """\
----------
@@ -46,34 +47,18 @@ Token: {}
"""
-@public
-@implementer(IWorkflow)
-class KeyChangeWorkflow(Workflow):
- name = 'pgp-key-change-workflow'
- description = ''
- initial_state = 'change_key'
+class KeyChangeBase(Workflow, PGPMixin):
save_attributes = (
'address_key',
- 'pubkey_key'
+ 'pubkey_key',
)
def __init__(self, mlist, pgp_address=None, pubkey=None):
- super().__init__()
- self.mlist = mlist
- self.pgp_list = PGPMailingList.for_list(mlist)
- self.pgp_address = pgp_address
+ Workflow.__init__(self)
+ PGPMixin.__init__(self, mlist, pgp_address)
self.pubkey = pubkey
@property
- def address_key(self):
- return self.pgp_address.email
-
- @address_key.setter
- def address_key(self, value):
- self.pgp_address = PGPAddress.for_email(value)
- self.member = self.mlist.regular_members.get_member(value)
-
- @property
def pubkey_key(self):
return str(self.pubkey)
@@ -81,23 +66,27 @@ class KeyChangeWorkflow(Workflow):
def pubkey_key(self, value):
self.pubkey, _ = PGPKey.from_blob(value)
- def _step_change_key(self):
- if self.pgp_address is None or self.pubkey is None:
- raise ValueError
-
- self.push('send_key_confirm_request')
-
- def _step_send_key_confirm_request(self):
+ def _pend(self, token_owner, lifetime=None):
pendings = getUtility(IPendings)
- pendable = KeyChangeWorkflow.pendable_class()(
+ pendable = self.pendable_class()(
email=self.pgp_address.email,
pubkey=str(self.pubkey),
fingerprint=self.pubkey.fingerprint
)
- lifetime = config.get_value('misc', 'change_request_lifetime')
+
self.token = pendings.add(pendable, lifetime=lifetime)
- self.token_owner = TokenOwner.subscriber
+ self.token_owner = token_owner
+
+ def _step_change_key(self):
+ if self.pgp_address is None or self.pubkey is None:
+ raise ValueError
+
+ self.push('send_key_confirm_request')
+ def _step_send_key_confirm_request(self):
+ self._pend(TokenOwner.subscriber,
+ lifetime=config.get_value('misc',
+ 'change_request_lifetime'))
self.push('receive_confirmation')
self.save()
request_address = self.mlist.request_address
@@ -116,20 +105,52 @@ class KeyChangeWorkflow(Workflow):
raise StopIteration
def _step_receive_confirmation(self):
+ self._set_token(TokenOwner.no_one)
+
+ def _step_do_change(self):
with transaction():
self.pgp_address.key = self.pubkey
self.pgp_address.key_confirmed = True
- pendings = getUtility(IPendings)
- if self.token is not None:
- pendings.confirm(self.token)
- self.token = None
- self.token_owner = TokenOwner.no_one
-
@classmethod
def pendable_class(cls):
@implementer(IPendable)
class Pendable(dict):
- PEND_TYPE = KeyChangeWorkflow.name
+ PEND_TYPE = cls.name
return Pendable
+
+
+@public
+@implementer(IWorkflow)
+class KeyChangeWorkflow(KeyChangeBase):
+ name = 'pgp-key-change-workflow'
+ description = ''
+ initial_state = 'prepare'
+
+ def _step_prepare(self):
+ self.push('do_change')
+ self.push('change_key')
+
+
+@public
+@implementer(IWorkflow)
+class KeyChangeModWorkflow(KeyChangeBase, ModeratorKeyChangeApprovalMixin):
+ name = 'pgp-key-change-mod-workflow'
+ description = ''
+ initial_state = 'prepare'
+ save_attributes = (
+ 'approved',
+ 'address_key',
+ 'pubkey_key'
+ )
+
+ def __init__(self, mlist, pgp_address=None, pubkey=None,
+ pre_approved=False):
+ KeyChangeBase.__init__(self, mlist, pgp_address, pubkey)
+ ModeratorKeyChangeApprovalMixin.__init__(self, pre_approved)
+
+ def _step_prepare(self):
+ self.push('do_change')
+ self.push('mod_approval')
+ self.push('change_key')
diff --git a/src/mailman_pgp/workflows/key_confirm.py b/src/mailman_pgp/workflows/key_confirm.py
new file mode 100644
index 0000000..0a38551
--- /dev/null
+++ b/src/mailman_pgp/workflows/key_confirm.py
@@ -0,0 +1,77 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+from mailman.email.message import UserNotification
+from mailman.interfaces.subscriptions import TokenOwner
+from public import public
+
+from mailman_pgp.database import transaction
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.wrapper import PGPWrapper
+from mailman_pgp.utils.email import copy_headers
+
+CONFIRM_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+Reply to this message with this whole text
+signed with your supplied key, either inline or PGP/MIME.
+
+Fingerprint: {}
+Token: {}
+----------
+"""
+
+
+@public
+class ConfirmPubkeyMixin:
+ def __init__(self, pre_confirmed=False):
+ self.pubkey_confirmed = pre_confirmed
+
+ def _step_pubkey_confirmation(self):
+ assert self.pgp_address is not None
+
+ if self.pubkey_confirmed:
+ with transaction():
+ self.pgp_address.key_confirmed = True
+ else:
+ if not self.pgp_address.key_confirmed:
+ self.push('send_key_confirm_request')
+
+ def _step_send_key_confirm_request(self):
+ self._set_token(TokenOwner.subscriber)
+ self.push('receive_key_confirmation')
+ self.save()
+
+ request_address = self.mlist.request_address
+ email_address = self.address.email
+ msg = UserNotification(email_address, request_address,
+ 'key confirm {}'.format(self.token),
+ CONFIRM_REQUEST.format(
+ self.pgp_address.key_fingerprint,
+ self.token))
+ pgp_list = PGPMailingList.for_list(self.mlist)
+ wrapped = PGPWrapper(msg)
+ encrypted = wrapped.sign_encrypt(pgp_list.key, self.pgp_address.key)
+
+ msg.set_payload(encrypted.get_payload())
+ copy_headers(encrypted, msg, True)
+ msg.send(self.mlist)
+ raise StopIteration
+
+ def _step_receive_key_confirmation(self):
+ self._set_token(TokenOwner.no_one)
diff --git a/src/mailman_pgp/workflows/key_revoke.py b/src/mailman_pgp/workflows/key_revoke.py
new file mode 100644
index 0000000..7a7c071
--- /dev/null
+++ b/src/mailman_pgp/workflows/key_revoke.py
@@ -0,0 +1,70 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+from mailman.interfaces.pending import IPendable
+from mailman.interfaces.workflows import IWorkflow
+from mailman.workflows.base import Workflow
+from public import public
+from zope.interface import implementer
+
+from mailman_pgp.workflows.base import PGPMixin
+from mailman_pgp.workflows.key_confirm import ConfirmPubkeyMixin
+from mailman_pgp.workflows.key_set import SetPubkeyMixin
+from mailman_pgp.workflows.mod_approval import ModeratorKeyRevokeApprovalMixin
+
+
+class KeyRevokeBase(Workflow, PGPMixin):
+ def __init__(self, mlist, pgp_address=None):
+ Workflow.__init__(self)
+ PGPMixin.__init__(self, mlist, pgp_address)
+
+ @classmethod
+ def pendable_class(cls):
+ @implementer(IPendable)
+ class Pendable(dict):
+ PEND_TYPE = cls.name
+
+ return Pendable
+
+
+@public
+@implementer(IWorkflow)
+class KeyRevokeWorkflow(KeyRevokeBase, SetPubkeyMixin, ConfirmPubkeyMixin,
+ ModeratorKeyRevokeApprovalMixin):
+ name = 'pgp-key-revoke-workflow'
+ description = ''
+ initial_state = 'prepare'
+ save_attributes = (
+ 'approved',
+ 'address_key',
+ 'pubkey_key',
+ 'pubkey_confirmed'
+ )
+
+ def __init__(self, mlist, pgp_address=None, pubkey=None,
+ pubkey_pre_confirmed=False, pre_approved=False):
+ KeyRevokeBase.__init__(self, mlist, pgp_address=pgp_address)
+ SetPubkeyMixin.__init__(self, pubkey=pubkey)
+ ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
+ ModeratorKeyRevokeApprovalMixin.__init__(self,
+ pre_approved=pre_approved)
+
+ def _step_prepare(self):
+ self.push('mod_approval')
+ self.push('pubkey_confirmation')
+ self.push('pubkey_checks')
diff --git a/src/mailman_pgp/workflows/key_set.py b/src/mailman_pgp/workflows/key_set.py
new file mode 100644
index 0000000..95e6e7c
--- /dev/null
+++ b/src/mailman_pgp/workflows/key_set.py
@@ -0,0 +1,76 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+from mailman.email.message import UserNotification
+from mailman.interfaces.subscriptions import TokenOwner
+from pgpy import PGPKey
+from public import public
+
+from mailman_pgp.database import transaction
+
+KEY_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+We need your pubkey.
+Reply to this message with it as a PGP/MIME(preferred) or inline.
+----------"""
+
+
+@public
+class SetPubkeyMixin:
+ def __init__(self, pubkey=None):
+ self.pubkey = pubkey
+
+ @property
+ def pubkey_key(self):
+ if self.pubkey is None:
+ return None
+ return str(self.pubkey)
+
+ @pubkey_key.setter
+ def pubkey_key(self, value):
+ if value is not None:
+ self.pubkey, _ = PGPKey.from_blob(value)
+ else:
+ self.pubkey = None
+
+ def _step_pubkey_checks(self):
+ assert self.pgp_address is not None
+
+ if self.pubkey is None:
+ if self.pgp_address.key is None:
+ self.push('send_key_request')
+ else:
+ with transaction():
+ self.pgp_address.key = self.pubkey
+
+ def _step_send_key_request(self):
+ self._set_token(TokenOwner.subscriber)
+ self.push('receive_key')
+ self.save()
+ request_address = self.mlist.request_address
+ email_address = self.address.email
+ msg = UserNotification(email_address, request_address,
+ 'key set {}'.format(self.token),
+ KEY_REQUEST)
+ msg.send(self.mlist, add_precedence=False)
+ # Now we wait for the confirmation.
+ raise StopIteration
+
+ def _step_receive_key(self):
+ self._set_token(TokenOwner.no_one)
diff --git a/src/mailman_pgp/workflows/mod_approval.py b/src/mailman_pgp/workflows/mod_approval.py
new file mode 100644
index 0000000..367f773
--- /dev/null
+++ b/src/mailman_pgp/workflows/mod_approval.py
@@ -0,0 +1,153 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+import copy
+
+from mailman.email.message import UserNotification
+from mailman.interfaces.pending import IPendings
+from mailman.interfaces.subscriptions import TokenOwner
+from public import public
+from zope.component import getUtility
+
+from mailman_pgp.pgp.mime import MIMEWrapper
+from mailman_pgp.utils.email import overwrite_message
+
+SUBSCRIPTION_MOD_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+A user with address {address} requested subscription.
+The key is attached to this message.
+
+Fingerprint: {fingerprint}
+----------
+"""
+
+KEY_CHANGE_MOD_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+A subscriber with address {address} requested a change of his key.
+The new key is attached to this message.
+
+Old key fingerprint: {old_fpr}
+New key fingerprint: {new_fpr}
+----------
+"""
+
+KEY_REVOKE_MOD_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+A subscriber with address {address} revoked a part of his key,
+which made it unusable and needs to be reset. The subscriber
+supplied a new key. The new key is attached to this message.
+
+Old key fingerprint: {old_fpr}
+New key fingerprint: {new_fpr}
+----------
+"""
+
+
+class ModeratorApprovalMixin:
+ def __init__(self, pre_approved=False):
+ self.approved = pre_approved
+
+ def _step_mod_approval(self):
+ if not self.approved:
+ self.push('get_approval')
+
+ def _step_get_approval(self):
+ self._pend(TokenOwner.moderator)
+ self.push('receive_mod_confirmation')
+ self.save()
+
+ name = self._request_name
+ body = self._request_body
+
+ if self.mlist.admin_immed_notify:
+ subject = 'New {} request from {}'.format(name,
+ self.pgp_address.email)
+ msg = UserNotification(
+ self.mlist.owner_address, self.mlist.owner_address,
+ subject, body, self.mlist.preferred_language)
+ out = copy.deepcopy(msg)
+ wrapped = MIMEWrapper(msg)
+ msg = wrapped.attach_keys(self.pubkey)
+ overwrite_message(msg, out)
+ out.send(self.mlist)
+ raise StopIteration
+
+ def _step_receive_mod_confirmation(self):
+ pendings = getUtility(IPendings)
+ if self.token is not None:
+ pendings.confirm(self.token)
+ self.token = None
+ self.token_owner = TokenOwner.no_one
+
+
+@public
+class ModeratorSubApprovalMixin(ModeratorApprovalMixin):
+ def __init__(self, pre_approved=False):
+ super().__init__(pre_approved)
+
+ @property
+ def _request_name(self):
+ return 'subscription'
+
+ @property
+ def _request_body(self):
+ params = {'mlist': self.mlist.fqdn_listname,
+ 'address': self.pgp_address.email,
+ 'fingerprint': self.pubkey.fingerprint}
+ return SUBSCRIPTION_MOD_REQUEST.format(**params)
+
+
+@public
+class ModeratorKeyChangeApprovalMixin(ModeratorApprovalMixin):
+ def __init__(self, pre_approved=False):
+ super().__init__(pre_approved)
+
+ @property
+ def _request_name(self):
+ return 'key change'
+
+ @property
+ def _request_body(self):
+ params = {'mlist': self.mlist.fqdn_listname,
+ 'address': self.pgp_address.email,
+ 'fingerprint': self.pubkey.fingerprint,
+ 'old_fpr': self.pgp_address.key_fingerprint,
+ 'new_fpr': self.pubkey.fingerprint}
+ return KEY_CHANGE_MOD_REQUEST.format(**params)
+
+
+@public
+class ModeratorKeyRevokeApprovalMixin(ModeratorApprovalMixin):
+ def __init__(self, pre_approved=False):
+ super().__init__(pre_approved)
+
+ @property
+ def _request_name(self):
+ return 'key reset'
+
+ @property
+ def _request_body(self):
+ params = {'mlist': self.mlist.fqdn_listname,
+ 'address': self.pgp_address.email,
+ 'fingerprint': self.pubkey.fingerprint,
+ 'old_fpr': self.pgp_address.key_fingerprint,
+ 'new_fpr': self.pubkey.fingerprint}
+ return KEY_REVOKE_MOD_REQUEST.format(**params)
diff --git a/src/mailman_pgp/workflows/pubkey.py b/src/mailman_pgp/workflows/pubkey.py
deleted file mode 100644
index a13d491..0000000
--- a/src/mailman_pgp/workflows/pubkey.py
+++ /dev/null
@@ -1,118 +0,0 @@
-from mailman.email.message import UserNotification
-from mailman.interfaces.subscriptions import TokenOwner
-from pgpy import PGPKey
-
-from mailman_pgp.database import transaction
-from mailman_pgp.model.address import PGPAddress
-from mailman_pgp.model.list import PGPMailingList
-from mailman_pgp.pgp.wrapper import PGPWrapper
-from mailman_pgp.utils.email import copy_headers
-
-KEY_REQUEST = """\
-----------
-TODO: this is a pgp enabled list.
-We need your pubkey.
-Reply to this message with it as a PGP/MIME(preferred) or inline.
-----------"""
-
-CONFIRM_REQUEST = """\
-----------
-TODO: this is a pgp enabled list.
-Reply to this message with this whole text
-signed with your supplied key, either inline or PGP/MIME.
-
-Fingerprint: {}
-Token: {}
-----------
-"""
-
-
-class SetPubkeyMixin:
- def __init__(self, pubkey=None):
- self.pubkey = pubkey
-
- @property
- def pubkey_key(self):
- if self.pubkey is None:
- return None
- return str(self.pubkey)
-
- @pubkey_key.setter
- def pubkey_key(self, value):
- if value is not None:
- self.pubkey, _ = PGPKey.from_blob(value)
- else:
- self.pubkey = None
-
- def _step_pubkey_checks(self):
- pgp_address = PGPAddress.for_address(self.address)
- assert pgp_address is not None
-
- if self.pubkey is None:
- if pgp_address.key is None:
- self.push('send_key_request')
- else:
- with transaction():
- pgp_address.key = self.pubkey
-
- def _step_send_key_request(self):
- self._set_token(TokenOwner.subscriber)
- self.push('receive_key')
- self.save()
- request_address = self.mlist.request_address
- email_address = self.address.email
- msg = UserNotification(email_address, request_address,
- 'key set {}'.format(self.token),
- KEY_REQUEST)
- msg.send(self.mlist, add_precedence=False)
- # Now we wait for the confirmation.
- raise StopIteration
-
- def _step_receive_key(self):
- self._restore_subscriber()
- self._set_token(TokenOwner.no_one)
-
-
-class ConfirmPubkeyMixin:
- def __init__(self, pre_confirmed=False):
- self.pubkey_confirmed = pre_confirmed
-
- def _step_pubkey_confirmation(self):
- pgp_address = PGPAddress.for_address(self.address)
- assert pgp_address is not None
-
- if self.pubkey_confirmed:
- with transaction():
- pgp_address.key_confirmed = True
- else:
- if not pgp_address.key_confirmed:
- self.push('send_key_confirm_request')
-
- def _step_send_key_confirm_request(self):
- self._set_token(TokenOwner.subscriber)
- self.push('receive_key_confirmation')
- self.save()
-
- pgp_address = PGPAddress.for_address(self.address)
- request_address = self.mlist.request_address
- email_address = self.address.email
- msg = UserNotification(email_address, request_address,
- 'key confirm {}'.format(self.token),
- CONFIRM_REQUEST.format(
- pgp_address.key_fingerprint,
- self.token))
- pgp_list = PGPMailingList.for_list(self.mlist)
- wrapped = PGPWrapper(msg)
- encrypted = wrapped.sign_encrypt(pgp_list.key, pgp_address.key)
-
- msg.set_payload(encrypted.get_payload())
- copy_headers(encrypted, msg, True)
- msg.send(self.mlist)
- raise StopIteration
-
- def _step_receive_key_confirmation(self):
- self._restore_subscriber()
- self._set_token(TokenOwner.no_one)
- with transaction():
- pgp_address = PGPAddress.for_address(self.address)
- pgp_address.key_confirmed = True
diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py
index 809b7cb..2546f1e 100644
--- a/src/mailman_pgp/workflows/subscription.py
+++ b/src/mailman_pgp/workflows/subscription.py
@@ -19,18 +19,19 @@
from mailman.core.i18n import _
from mailman.interfaces.workflows import ISubscriptionWorkflow
-from mailman.workflows.common import (ConfirmationMixin, ModerationMixin,
- SubscriptionBase, VerificationMixin)
+from mailman.workflows.common import (ConfirmationMixin, VerificationMixin)
from public import public
from zope.interface import implementer
-from mailman_pgp.workflows.base import PGPMixin
-from mailman_pgp.workflows.pubkey import ConfirmPubkeyMixin, SetPubkeyMixin
+from mailman_pgp.workflows.base import PGPMixin, PGPSubscriptionBase
+from mailman_pgp.workflows.key_confirm import ConfirmPubkeyMixin
+from mailman_pgp.workflows.key_set import SetPubkeyMixin
+from mailman_pgp.workflows.mod_approval import ModeratorSubApprovalMixin
@public
@implementer(ISubscriptionWorkflow)
-class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin,
+class OpenSubscriptionPolicy(PGPSubscriptionBase, VerificationMixin,
SetPubkeyMixin, ConfirmPubkeyMixin,
PGPMixin):
""""""
@@ -52,26 +53,28 @@ class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin,
def __init__(self, mlist, subscriber=None, *,
pre_verified=False, pubkey=None,
pubkey_pre_confirmed=False):
- SubscriptionBase.__init__(self, mlist, subscriber)
+ PGPSubscriptionBase.__init__(self, mlist, subscriber)
VerificationMixin.__init__(self, pre_verified=pre_verified)
SetPubkeyMixin.__init__(self, pubkey=pubkey)
ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
- PGPMixin.__init__(self)
+ PGPMixin.__init__(self, mlist)
def _step_prepare(self):
self.push('do_subscription')
+ self.push('restore_subscriber')
self.push('pubkey_confirmation')
+ self.push('restore_address')
self.push('pubkey_checks')
- self.push('pgp_prepare')
+ self.push('create_address')
self.push('verification_checks')
self.push('sanity_checks')
@public
@implementer(ISubscriptionWorkflow)
-class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
+class ConfirmSubscriptionPolicy(PGPSubscriptionBase, VerificationMixin,
ConfirmationMixin, SetPubkeyMixin,
- ConfirmPubkeyMixin, PGPMixin):
+ ConfirmPubkeyMixin):
""""""
name = 'pgp-policy-confirm'
@@ -92,18 +95,19 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
def __init__(self, mlist, subscriber=None, *,
pre_verified=False, pre_confirmed=False, pubkey=None,
pubkey_pre_confirmed=False):
- SubscriptionBase.__init__(self, mlist, subscriber)
+ PGPSubscriptionBase.__init__(self, mlist, subscriber)
VerificationMixin.__init__(self, pre_verified=pre_verified)
ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed)
SetPubkeyMixin.__init__(self, pubkey=pubkey)
ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
- PGPMixin.__init__(self)
def _step_prepare(self):
self.push('do_subscription')
+ self.push('restore_subscriber')
self.push('pubkey_confirmation')
+ self.push('restore_address')
self.push('pubkey_checks')
- self.push('pgp_prepare')
+ self.push('create_address')
self.push('confirmation_checks')
self.push('verification_checks')
self.push('sanity_checks')
@@ -111,9 +115,9 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
@public
@implementer(ISubscriptionWorkflow)
-class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
- ModerationMixin, SetPubkeyMixin,
- ConfirmPubkeyMixin, PGPMixin):
+class ModerationSubscriptionPolicy(PGPSubscriptionBase, VerificationMixin,
+ ModeratorSubApprovalMixin, SetPubkeyMixin,
+ ConfirmPubkeyMixin):
""""""
name = 'pgp-policy-moderate'
@@ -134,29 +138,31 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
def __init__(self, mlist, subscriber=None, *,
pre_verified=False, pre_approved=False, pubkey=None,
pubkey_pre_confirmed=False):
- SubscriptionBase.__init__(self, mlist, subscriber)
+ PGPSubscriptionBase.__init__(self, mlist, subscriber)
VerificationMixin.__init__(self, pre_verified=pre_verified)
- ModerationMixin.__init__(self, pre_approved=pre_approved)
+ ModeratorSubApprovalMixin.__init__(self, pre_approved=pre_approved)
SetPubkeyMixin.__init__(self, pubkey=pubkey)
ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
- PGPMixin.__init__(self)
def _step_prepare(self):
self.push('do_subscription')
- self.push('moderation_checks')
+ self.push('restore_subscriber')
+ self.push('mod_approval')
self.push('pubkey_confirmation')
+ self.push('restore_address')
self.push('pubkey_checks')
- self.push('pgp_prepare')
+ self.push('create_address')
self.push('verification_checks')
self.push('sanity_checks')
@public
@implementer(ISubscriptionWorkflow)
-class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
- ConfirmationMixin, ModerationMixin,
- SetPubkeyMixin, ConfirmPubkeyMixin,
- PGPMixin):
+class ConfirmModerationSubscriptionPolicy(PGPSubscriptionBase,
+ VerificationMixin,
+ ConfirmationMixin,
+ ModeratorSubApprovalMixin,
+ SetPubkeyMixin, ConfirmPubkeyMixin):
""""""
name = 'pgp-policy-confirm-moderate'
@@ -178,20 +184,22 @@ class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
def __init__(self, mlist, subscriber=None, *,
pre_verified=False, pre_confirmed=False, pre_approved=False,
pubkey=None, pubkey_pre_confirmed=False):
- SubscriptionBase.__init__(self, mlist, subscriber)
+ PGPSubscriptionBase.__init__(self, mlist, subscriber)
VerificationMixin.__init__(self, pre_verified=pre_verified)
ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed)
- ModerationMixin.__init__(self, pre_approved=pre_approved)
+ ModeratorSubApprovalMixin.__init__(self, pre_approved=pre_approved)
SetPubkeyMixin.__init__(self, pubkey=pubkey)
ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
- PGPMixin.__init__(self)
def _step_prepare(self):
self.push('do_subscription')
- self.push('moderation_checks')
+ self.push('restore_subscriber')
+ self.push('mod_approval')
+ self.push('restore_address')
self.push('pubkey_confirmation')
+ self.push('restore_address')
self.push('pubkey_checks')
- self.push('pgp_prepare')
+ self.push('create_address')
self.push('confirmation_checks')
self.push('verification_checks')
self.push('sanity_checks')
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
index 31b3d05..3904105 100644
--- a/src/mailman_pgp/workflows/tests/test_base.py
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -37,8 +37,8 @@ from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.testing.layers import PGPConfigLayer
from mailman_pgp.testing.pgp import load_key
from mailman_pgp.workflows.base import (PGPMixin)
-from mailman_pgp.workflows.pubkey import (ConfirmPubkeyMixin, KEY_REQUEST,
- SetPubkeyMixin)
+from mailman_pgp.workflows.key_confirm import ConfirmPubkeyMixin
+from mailman_pgp.workflows.key_set import KEY_REQUEST, SetPubkeyMixin
class PubkeyMixinTestSetup():
@@ -49,13 +49,14 @@ class PubkeyMixinTestSetup():
self.list_key = load_key('ecc_p256.priv.asc')
- self.pgp_list = PGPMailingList.for_list(self.mlist)
- self.pgp_list.key = self.list_key
+ with transaction():
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = self.list_key
self.um = getUtility(IUserManager)
self.sender_key = load_key('rsa_1024.priv.asc')
- self.sender = self.um.create_address('rsa-1024b@example.org')
+ self.sender = self.um.create_address('anne@example.org')
@implementer(IWorkflow)
@@ -78,13 +79,13 @@ class PGPTestWorkflow(SubscriptionBase, PGPMixin, SetPubkeyMixin,
SubscriptionBase.__init__(self, mlist, subscriber)
SetPubkeyMixin.__init__(self, pubkey=pubkey)
ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
- PGPMixin.__init__(self)
+ PGPMixin.__init__(self, mlist)
def _step_prepare(self):
self.push('do_subscription')
self.push('pubkey_confirmation')
self.push('pubkey_checks')
- self.push('pgp_prepare')
+ self.push('create_address')
self.push('sanity_checks')
@@ -93,7 +94,7 @@ class TestPGPMixin(PubkeyMixinTestSetup, unittest.TestCase):
def test_create_address(self):
workflow = PGPTestWorkflow(self.mlist, self.sender)
- workflow.run_thru('pgp_prepare')
+ workflow.run_thru('create_address')
pgp_address = PGPAddress.for_address(self.sender)
self.assertIsNotNone(pgp_address)
@@ -102,7 +103,7 @@ class TestPGPMixin(PubkeyMixinTestSetup, unittest.TestCase):
with transaction() as t:
pgp_address = PGPAddress(self.sender)
t.add(pgp_address)
- workflow.run_thru('pgp_prepare')
+ workflow.run_thru('create_address')
still = PGPAddress.for_address(self.sender)
self.assertIsNotNone(still)
@@ -203,7 +204,7 @@ class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase):
self.assertIsNotNone(workflow.token)
pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
self.assertEqual(pendable['list_id'], 'test.example.com')
- self.assertEqual(pendable['email'], 'rsa-1024b@example.org')
+ self.assertEqual(pendable['email'], 'anne@example.org')
self.assertEqual(pendable['display_name'], '')
self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
self.assertEqual(pendable['token_owner'], 'subscriber')
@@ -216,7 +217,7 @@ class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase):
self.assertIsNotNone(workflow.token)
pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
self.assertEqual(pendable['list_id'], 'test.example.com')
- self.assertEqual(pendable['email'], 'rsa-1024b@example.org')
+ self.assertEqual(pendable['email'], 'anne@example.org')
self.assertEqual(pendable['display_name'], '')
self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
self.assertEqual(pendable['token_owner'], 'subscriber')
diff --git a/src/mailman_pgp/workflows/tests/test_key_change.py b/src/mailman_pgp/workflows/tests/test_key_change.py
index e469d51..5d4926a 100644
--- a/src/mailman_pgp/workflows/tests/test_key_change.py
+++ b/src/mailman_pgp/workflows/tests/test_key_change.py
@@ -25,13 +25,15 @@ from mailman.interfaces.usermanager import IUserManager
from mailman.testing.helpers import get_queue_messages
from zope.component import getUtility
+from mailman_pgp.config import mm_config
from mailman_pgp.database import mm_transaction, transaction
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.testing.layers import PGPConfigLayer
from mailman_pgp.testing.pgp import load_key
-from mailman_pgp.workflows.key_change import KeyChangeWorkflow
+from mailman_pgp.workflows.key_change import (KeyChangeModWorkflow,
+ KeyChangeWorkflow)
class TestKeyChangeWorkflow(unittest.TestCase):
@@ -41,13 +43,18 @@ class TestKeyChangeWorkflow(unittest.TestCase):
with mm_transaction():
self.mlist = create_list('test@example.com',
style_name='pgp-default')
- self.pgp_list = PGPMailingList.for_list(self.mlist)
- self.pgp_list.key = load_key('ecc_p256.priv.asc')
+ with transaction():
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = load_key('ecc_p256.priv.asc')
self.sender_key = load_key('rsa_1024.priv.asc')
self.sender_new_key = load_key('ecc_p256.priv.asc')
self.sender = getUtility(IUserManager).create_address(
- 'rsa-1024b@example.org')
+ 'anne@example.org')
+
+ def test_has_workflows(self):
+ self.assertTrue(KeyChangeWorkflow.name, mm_config.workflows)
+ self.assertTrue(KeyChangeModWorkflow.name, mm_config.workflows)
def test_pgp_address_none(self):
workflow = KeyChangeWorkflow(self.mlist)
@@ -101,3 +108,19 @@ class TestKeyChangeWorkflow(unittest.TestCase):
self.assertEqual(pgp_address.key_fingerprint,
self.sender_new_key.fingerprint)
self.assertTrue(pgp_address.key_confirmed)
+
+ def test_confirm_mod(self):
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ workflow = KeyChangeModWorkflow(self.mlist, pgp_address,
+ self.sender_new_key.pubkey)
+ list(workflow)
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).confirm(
+ workflow.token)
+ self.assertIsNotNone(token)
+ self.assertEqual(token_owner, TokenOwner.moderator)
diff --git a/src/mailman_pgp/workflows/tests/test_mod_approval.py b/src/mailman_pgp/workflows/tests/test_mod_approval.py
new file mode 100644
index 0000000..7d57a9b
--- /dev/null
+++ b/src/mailman_pgp/workflows/tests/test_mod_approval.py
@@ -0,0 +1,106 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+from unittest import TestCase
+
+from mailman.app.lifecycle import create_list
+from mailman.interfaces.subscriptions import TokenOwner
+from mailman.interfaces.usermanager import IUserManager
+from mailman.interfaces.workflows import IWorkflow
+from mailman.testing.helpers import get_queue_messages
+from zope.component import getUtility
+from zope.interface import implementer
+
+from mailman_pgp.database import mm_transaction, transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.wrapper import PGPWrapper
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.testing.pgp import load_key
+from mailman_pgp.workflows.key_change import KeyChangeBase
+from mailman_pgp.workflows.mod_approval import (
+ ModeratorKeyChangeApprovalMixin)
+
+
+@implementer(IWorkflow)
+class PGPTestWorkflow(KeyChangeBase, ModeratorKeyChangeApprovalMixin):
+ name = 'test-workflow'
+ description = ''
+ initial_state = 'mod_approval'
+ save_attributes = (
+ 'approved',
+ )
+
+ def __init__(self, mlist, pgp_address=None, pubkey=None,
+ pre_approved=False):
+ KeyChangeBase.__init__(self, mlist, pgp_address, pubkey)
+ ModeratorKeyChangeApprovalMixin.__init__(self, pre_approved)
+
+
+class TestModeratorApprovalMixin(TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ with mm_transaction():
+ self.mlist = create_list('test@example.com',
+ style_name='pgp-default')
+ with transaction():
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = load_key('ecc_p256.priv.asc')
+
+ self.sender_key = load_key('rsa_1024.priv.asc')
+ self.sender_new_key = load_key('ecc_p256.priv.asc')
+ self.sender = getUtility(IUserManager).create_address(
+ 'anne@example.org')
+
+ def test_get_approval(self):
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ workflow = PGPTestWorkflow(self.mlist, pgp_address,
+ self.sender_new_key.pubkey)
+ list(workflow)
+ items = get_queue_messages('virgin', expected_count=1)
+ message = items[0].msg
+
+ self.assertEqual(message['Subject'],
+ 'New key change request from {}'.format(
+ pgp_address.email))
+ wrapped = PGPWrapper(message)
+ self.assertTrue(wrapped.has_keys())
+ keys = list(wrapped.keys())
+ self.assertEqual(len(keys), 1)
+ key = keys.pop()
+ self.assertEqual(key.fingerprint, self.sender_new_key.fingerprint)
+
+ def test_receive_approval(self):
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ workflow = PGPTestWorkflow(self.mlist, pgp_address,
+ self.sender_new_key.pubkey)
+ list(workflow)
+ get_queue_messages('virgin', expected_count=1)
+ list(workflow)
+ self.assertEqual(workflow.token_owner, TokenOwner.no_one)