diff options
Diffstat (limited to 'src/mailman_pgp/workflows')
| -rw-r--r-- | src/mailman_pgp/workflows/base.py | 70 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/key_change.py | 99 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/key_confirm.py | 77 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/key_revoke.py | 70 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/key_set.py | 76 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/mod_approval.py | 153 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/pubkey.py | 118 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/subscription.py | 68 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/tests/test_base.py | 23 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/tests/test_key_change.py | 31 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/tests/test_mod_approval.py | 106 |
11 files changed, 684 insertions, 207 deletions
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index 036dc18..9fc5d58 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -16,15 +16,75 @@ # this program. If not, see <http://www.gnu.org/licenses/>. """""" +from datetime import timedelta + +from mailman.interfaces.pending import IPendings +from mailman.interfaces.subscriptions import TokenOwner +from mailman.utilities.datetime import now +from mailman.utilities.modules import abstract_component +from mailman.workflows.common import SubscriptionBase +from public import public +from zope.component import getUtility from mailman_pgp.database import transaction from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +@public class PGPMixin: - def _step_pgp_prepare(self): - pgp_address = PGPAddress.for_address(self.address) - if pgp_address is None: + def __init__(self, mlist, pgp_address=None): + self.mlist = mlist + self.pgp_list = PGPMailingList.for_list(mlist) + self.pgp_address = pgp_address + if self.pgp_address is not None: + self.address = self.pgp_address.address + + @property + def address_key(self): + return self.pgp_address.email + + @address_key.setter + def address_key(self, value): + self.pgp_address = PGPAddress.for_email(value) + self.member = self.mlist.regular_members.get_member(value) + + def _step_create_address(self): + self.pgp_address = PGPAddress.for_address(self.address) + if self.pgp_address is None: with transaction() as t: - pgp_address = PGPAddress(self.address) - t.add(pgp_address) + self.pgp_address = PGPAddress(self.address) + t.add(self.pgp_address) + + def _step_restore_address(self): + self.pgp_address = PGPAddress.for_address(self.address) + + def _set_token(self, token_owner): + assert isinstance(token_owner, TokenOwner) + pendings = getUtility(IPendings) + if self.token is not None: + pendings.confirm(self.token) + self.token_owner = token_owner + if token_owner is TokenOwner.no_one: + self.token = None + return + + pendable = self.pendable_class()( + list_id=self.mlist.list_id, + email=self.address.email, + display_name=self.address.display_name, + when=now().replace(microsecond=0).isoformat(), + token_owner=token_owner.name, + ) + self.token = pendings.add(pendable, timedelta(days=3650)) + + +@public +@abstract_component +class PGPSubscriptionBase(SubscriptionBase, PGPMixin): + def __init__(self, mlist, subscriber=None, *, pgp_address=None): + SubscriptionBase.__init__(self, mlist, subscriber) + PGPMixin.__init__(self, mlist, pgp_address=pgp_address) + + def _step_restore_subscriber(self): + self._restore_subscriber() diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py index 290e504..1d07903 100644 --- a/src/mailman_pgp/workflows/key_change.py +++ b/src/mailman_pgp/workflows/key_change.py @@ -28,10 +28,11 @@ from zope.interface import implementer from mailman_pgp.config import config from mailman_pgp.database import transaction -from mailman_pgp.model.address import PGPAddress -from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.utils.email import copy_headers +from mailman_pgp.workflows.base import PGPMixin +from mailman_pgp.workflows.mod_approval import ( + ModeratorKeyChangeApprovalMixin) CHANGE_CONFIRM_REQUEST = """\ ---------- @@ -46,34 +47,18 @@ Token: {} """ -@public -@implementer(IWorkflow) -class KeyChangeWorkflow(Workflow): - name = 'pgp-key-change-workflow' - description = '' - initial_state = 'change_key' +class KeyChangeBase(Workflow, PGPMixin): save_attributes = ( 'address_key', - 'pubkey_key' + 'pubkey_key', ) def __init__(self, mlist, pgp_address=None, pubkey=None): - super().__init__() - self.mlist = mlist - self.pgp_list = PGPMailingList.for_list(mlist) - self.pgp_address = pgp_address + Workflow.__init__(self) + PGPMixin.__init__(self, mlist, pgp_address) self.pubkey = pubkey @property - def address_key(self): - return self.pgp_address.email - - @address_key.setter - def address_key(self, value): - self.pgp_address = PGPAddress.for_email(value) - self.member = self.mlist.regular_members.get_member(value) - - @property def pubkey_key(self): return str(self.pubkey) @@ -81,23 +66,27 @@ class KeyChangeWorkflow(Workflow): def pubkey_key(self, value): self.pubkey, _ = PGPKey.from_blob(value) - def _step_change_key(self): - if self.pgp_address is None or self.pubkey is None: - raise ValueError - - self.push('send_key_confirm_request') - - def _step_send_key_confirm_request(self): + def _pend(self, token_owner, lifetime=None): pendings = getUtility(IPendings) - pendable = KeyChangeWorkflow.pendable_class()( + pendable = self.pendable_class()( email=self.pgp_address.email, pubkey=str(self.pubkey), fingerprint=self.pubkey.fingerprint ) - lifetime = config.get_value('misc', 'change_request_lifetime') + self.token = pendings.add(pendable, lifetime=lifetime) - self.token_owner = TokenOwner.subscriber + self.token_owner = token_owner + + def _step_change_key(self): + if self.pgp_address is None or self.pubkey is None: + raise ValueError + + self.push('send_key_confirm_request') + def _step_send_key_confirm_request(self): + self._pend(TokenOwner.subscriber, + lifetime=config.get_value('misc', + 'change_request_lifetime')) self.push('receive_confirmation') self.save() request_address = self.mlist.request_address @@ -116,20 +105,52 @@ class KeyChangeWorkflow(Workflow): raise StopIteration def _step_receive_confirmation(self): + self._set_token(TokenOwner.no_one) + + def _step_do_change(self): with transaction(): self.pgp_address.key = self.pubkey self.pgp_address.key_confirmed = True - pendings = getUtility(IPendings) - if self.token is not None: - pendings.confirm(self.token) - self.token = None - self.token_owner = TokenOwner.no_one - @classmethod def pendable_class(cls): @implementer(IPendable) class Pendable(dict): - PEND_TYPE = KeyChangeWorkflow.name + PEND_TYPE = cls.name return Pendable + + +@public +@implementer(IWorkflow) +class KeyChangeWorkflow(KeyChangeBase): + name = 'pgp-key-change-workflow' + description = '' + initial_state = 'prepare' + + def _step_prepare(self): + self.push('do_change') + self.push('change_key') + + +@public +@implementer(IWorkflow) +class KeyChangeModWorkflow(KeyChangeBase, ModeratorKeyChangeApprovalMixin): + name = 'pgp-key-change-mod-workflow' + description = '' + initial_state = 'prepare' + save_attributes = ( + 'approved', + 'address_key', + 'pubkey_key' + ) + + def __init__(self, mlist, pgp_address=None, pubkey=None, + pre_approved=False): + KeyChangeBase.__init__(self, mlist, pgp_address, pubkey) + ModeratorKeyChangeApprovalMixin.__init__(self, pre_approved) + + def _step_prepare(self): + self.push('do_change') + self.push('mod_approval') + self.push('change_key') diff --git a/src/mailman_pgp/workflows/key_confirm.py b/src/mailman_pgp/workflows/key_confirm.py new file mode 100644 index 0000000..0a38551 --- /dev/null +++ b/src/mailman_pgp/workflows/key_confirm.py @@ -0,0 +1,77 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +from mailman.email.message import UserNotification +from mailman.interfaces.subscriptions import TokenOwner +from public import public + +from mailman_pgp.database import transaction +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.utils.email import copy_headers + +CONFIRM_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +Reply to this message with this whole text +signed with your supplied key, either inline or PGP/MIME. + +Fingerprint: {} +Token: {} +---------- +""" + + +@public +class ConfirmPubkeyMixin: + def __init__(self, pre_confirmed=False): + self.pubkey_confirmed = pre_confirmed + + def _step_pubkey_confirmation(self): + assert self.pgp_address is not None + + if self.pubkey_confirmed: + with transaction(): + self.pgp_address.key_confirmed = True + else: + if not self.pgp_address.key_confirmed: + self.push('send_key_confirm_request') + + def _step_send_key_confirm_request(self): + self._set_token(TokenOwner.subscriber) + self.push('receive_key_confirmation') + self.save() + + request_address = self.mlist.request_address + email_address = self.address.email + msg = UserNotification(email_address, request_address, + 'key confirm {}'.format(self.token), + CONFIRM_REQUEST.format( + self.pgp_address.key_fingerprint, + self.token)) + pgp_list = PGPMailingList.for_list(self.mlist) + wrapped = PGPWrapper(msg) + encrypted = wrapped.sign_encrypt(pgp_list.key, self.pgp_address.key) + + msg.set_payload(encrypted.get_payload()) + copy_headers(encrypted, msg, True) + msg.send(self.mlist) + raise StopIteration + + def _step_receive_key_confirmation(self): + self._set_token(TokenOwner.no_one) diff --git a/src/mailman_pgp/workflows/key_revoke.py b/src/mailman_pgp/workflows/key_revoke.py new file mode 100644 index 0000000..7a7c071 --- /dev/null +++ b/src/mailman_pgp/workflows/key_revoke.py @@ -0,0 +1,70 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +from mailman.interfaces.pending import IPendable +from mailman.interfaces.workflows import IWorkflow +from mailman.workflows.base import Workflow +from public import public +from zope.interface import implementer + +from mailman_pgp.workflows.base import PGPMixin +from mailman_pgp.workflows.key_confirm import ConfirmPubkeyMixin +from mailman_pgp.workflows.key_set import SetPubkeyMixin +from mailman_pgp.workflows.mod_approval import ModeratorKeyRevokeApprovalMixin + + +class KeyRevokeBase(Workflow, PGPMixin): + def __init__(self, mlist, pgp_address=None): + Workflow.__init__(self) + PGPMixin.__init__(self, mlist, pgp_address) + + @classmethod + def pendable_class(cls): + @implementer(IPendable) + class Pendable(dict): + PEND_TYPE = cls.name + + return Pendable + + +@public +@implementer(IWorkflow) +class KeyRevokeWorkflow(KeyRevokeBase, SetPubkeyMixin, ConfirmPubkeyMixin, + ModeratorKeyRevokeApprovalMixin): + name = 'pgp-key-revoke-workflow' + description = '' + initial_state = 'prepare' + save_attributes = ( + 'approved', + 'address_key', + 'pubkey_key', + 'pubkey_confirmed' + ) + + def __init__(self, mlist, pgp_address=None, pubkey=None, + pubkey_pre_confirmed=False, pre_approved=False): + KeyRevokeBase.__init__(self, mlist, pgp_address=pgp_address) + SetPubkeyMixin.__init__(self, pubkey=pubkey) + ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) + ModeratorKeyRevokeApprovalMixin.__init__(self, + pre_approved=pre_approved) + + def _step_prepare(self): + self.push('mod_approval') + self.push('pubkey_confirmation') + self.push('pubkey_checks') diff --git a/src/mailman_pgp/workflows/key_set.py b/src/mailman_pgp/workflows/key_set.py new file mode 100644 index 0000000..95e6e7c --- /dev/null +++ b/src/mailman_pgp/workflows/key_set.py @@ -0,0 +1,76 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +from mailman.email.message import UserNotification +from mailman.interfaces.subscriptions import TokenOwner +from pgpy import PGPKey +from public import public + +from mailman_pgp.database import transaction + +KEY_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +We need your pubkey. +Reply to this message with it as a PGP/MIME(preferred) or inline. +----------""" + + +@public +class SetPubkeyMixin: + def __init__(self, pubkey=None): + self.pubkey = pubkey + + @property + def pubkey_key(self): + if self.pubkey is None: + return None + return str(self.pubkey) + + @pubkey_key.setter + def pubkey_key(self, value): + if value is not None: + self.pubkey, _ = PGPKey.from_blob(value) + else: + self.pubkey = None + + def _step_pubkey_checks(self): + assert self.pgp_address is not None + + if self.pubkey is None: + if self.pgp_address.key is None: + self.push('send_key_request') + else: + with transaction(): + self.pgp_address.key = self.pubkey + + def _step_send_key_request(self): + self._set_token(TokenOwner.subscriber) + self.push('receive_key') + self.save() + request_address = self.mlist.request_address + email_address = self.address.email + msg = UserNotification(email_address, request_address, + 'key set {}'.format(self.token), + KEY_REQUEST) + msg.send(self.mlist, add_precedence=False) + # Now we wait for the confirmation. + raise StopIteration + + def _step_receive_key(self): + self._set_token(TokenOwner.no_one) diff --git a/src/mailman_pgp/workflows/mod_approval.py b/src/mailman_pgp/workflows/mod_approval.py new file mode 100644 index 0000000..367f773 --- /dev/null +++ b/src/mailman_pgp/workflows/mod_approval.py @@ -0,0 +1,153 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +import copy + +from mailman.email.message import UserNotification +from mailman.interfaces.pending import IPendings +from mailman.interfaces.subscriptions import TokenOwner +from public import public +from zope.component import getUtility + +from mailman_pgp.pgp.mime import MIMEWrapper +from mailman_pgp.utils.email import overwrite_message + +SUBSCRIPTION_MOD_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +A user with address {address} requested subscription. +The key is attached to this message. + +Fingerprint: {fingerprint} +---------- +""" + +KEY_CHANGE_MOD_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +A subscriber with address {address} requested a change of his key. +The new key is attached to this message. + +Old key fingerprint: {old_fpr} +New key fingerprint: {new_fpr} +---------- +""" + +KEY_REVOKE_MOD_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +A subscriber with address {address} revoked a part of his key, +which made it unusable and needs to be reset. The subscriber +supplied a new key. The new key is attached to this message. + +Old key fingerprint: {old_fpr} +New key fingerprint: {new_fpr} +---------- +""" + + +class ModeratorApprovalMixin: + def __init__(self, pre_approved=False): + self.approved = pre_approved + + def _step_mod_approval(self): + if not self.approved: + self.push('get_approval') + + def _step_get_approval(self): + self._pend(TokenOwner.moderator) + self.push('receive_mod_confirmation') + self.save() + + name = self._request_name + body = self._request_body + + if self.mlist.admin_immed_notify: + subject = 'New {} request from {}'.format(name, + self.pgp_address.email) + msg = UserNotification( + self.mlist.owner_address, self.mlist.owner_address, + subject, body, self.mlist.preferred_language) + out = copy.deepcopy(msg) + wrapped = MIMEWrapper(msg) + msg = wrapped.attach_keys(self.pubkey) + overwrite_message(msg, out) + out.send(self.mlist) + raise StopIteration + + def _step_receive_mod_confirmation(self): + pendings = getUtility(IPendings) + if self.token is not None: + pendings.confirm(self.token) + self.token = None + self.token_owner = TokenOwner.no_one + + +@public +class ModeratorSubApprovalMixin(ModeratorApprovalMixin): + def __init__(self, pre_approved=False): + super().__init__(pre_approved) + + @property + def _request_name(self): + return 'subscription' + + @property + def _request_body(self): + params = {'mlist': self.mlist.fqdn_listname, + 'address': self.pgp_address.email, + 'fingerprint': self.pubkey.fingerprint} + return SUBSCRIPTION_MOD_REQUEST.format(**params) + + +@public +class ModeratorKeyChangeApprovalMixin(ModeratorApprovalMixin): + def __init__(self, pre_approved=False): + super().__init__(pre_approved) + + @property + def _request_name(self): + return 'key change' + + @property + def _request_body(self): + params = {'mlist': self.mlist.fqdn_listname, + 'address': self.pgp_address.email, + 'fingerprint': self.pubkey.fingerprint, + 'old_fpr': self.pgp_address.key_fingerprint, + 'new_fpr': self.pubkey.fingerprint} + return KEY_CHANGE_MOD_REQUEST.format(**params) + + +@public +class ModeratorKeyRevokeApprovalMixin(ModeratorApprovalMixin): + def __init__(self, pre_approved=False): + super().__init__(pre_approved) + + @property + def _request_name(self): + return 'key reset' + + @property + def _request_body(self): + params = {'mlist': self.mlist.fqdn_listname, + 'address': self.pgp_address.email, + 'fingerprint': self.pubkey.fingerprint, + 'old_fpr': self.pgp_address.key_fingerprint, + 'new_fpr': self.pubkey.fingerprint} + return KEY_REVOKE_MOD_REQUEST.format(**params) diff --git a/src/mailman_pgp/workflows/pubkey.py b/src/mailman_pgp/workflows/pubkey.py deleted file mode 100644 index a13d491..0000000 --- a/src/mailman_pgp/workflows/pubkey.py +++ /dev/null @@ -1,118 +0,0 @@ -from mailman.email.message import UserNotification -from mailman.interfaces.subscriptions import TokenOwner -from pgpy import PGPKey - -from mailman_pgp.database import transaction -from mailman_pgp.model.address import PGPAddress -from mailman_pgp.model.list import PGPMailingList -from mailman_pgp.pgp.wrapper import PGPWrapper -from mailman_pgp.utils.email import copy_headers - -KEY_REQUEST = """\ ----------- -TODO: this is a pgp enabled list. -We need your pubkey. -Reply to this message with it as a PGP/MIME(preferred) or inline. -----------""" - -CONFIRM_REQUEST = """\ ----------- -TODO: this is a pgp enabled list. -Reply to this message with this whole text -signed with your supplied key, either inline or PGP/MIME. - -Fingerprint: {} -Token: {} ----------- -""" - - -class SetPubkeyMixin: - def __init__(self, pubkey=None): - self.pubkey = pubkey - - @property - def pubkey_key(self): - if self.pubkey is None: - return None - return str(self.pubkey) - - @pubkey_key.setter - def pubkey_key(self, value): - if value is not None: - self.pubkey, _ = PGPKey.from_blob(value) - else: - self.pubkey = None - - def _step_pubkey_checks(self): - pgp_address = PGPAddress.for_address(self.address) - assert pgp_address is not None - - if self.pubkey is None: - if pgp_address.key is None: - self.push('send_key_request') - else: - with transaction(): - pgp_address.key = self.pubkey - - def _step_send_key_request(self): - self._set_token(TokenOwner.subscriber) - self.push('receive_key') - self.save() - request_address = self.mlist.request_address - email_address = self.address.email - msg = UserNotification(email_address, request_address, - 'key set {}'.format(self.token), - KEY_REQUEST) - msg.send(self.mlist, add_precedence=False) - # Now we wait for the confirmation. - raise StopIteration - - def _step_receive_key(self): - self._restore_subscriber() - self._set_token(TokenOwner.no_one) - - -class ConfirmPubkeyMixin: - def __init__(self, pre_confirmed=False): - self.pubkey_confirmed = pre_confirmed - - def _step_pubkey_confirmation(self): - pgp_address = PGPAddress.for_address(self.address) - assert pgp_address is not None - - if self.pubkey_confirmed: - with transaction(): - pgp_address.key_confirmed = True - else: - if not pgp_address.key_confirmed: - self.push('send_key_confirm_request') - - def _step_send_key_confirm_request(self): - self._set_token(TokenOwner.subscriber) - self.push('receive_key_confirmation') - self.save() - - pgp_address = PGPAddress.for_address(self.address) - request_address = self.mlist.request_address - email_address = self.address.email - msg = UserNotification(email_address, request_address, - 'key confirm {}'.format(self.token), - CONFIRM_REQUEST.format( - pgp_address.key_fingerprint, - self.token)) - pgp_list = PGPMailingList.for_list(self.mlist) - wrapped = PGPWrapper(msg) - encrypted = wrapped.sign_encrypt(pgp_list.key, pgp_address.key) - - msg.set_payload(encrypted.get_payload()) - copy_headers(encrypted, msg, True) - msg.send(self.mlist) - raise StopIteration - - def _step_receive_key_confirmation(self): - self._restore_subscriber() - self._set_token(TokenOwner.no_one) - with transaction(): - pgp_address = PGPAddress.for_address(self.address) - pgp_address.key_confirmed = True diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py index 809b7cb..2546f1e 100644 --- a/src/mailman_pgp/workflows/subscription.py +++ b/src/mailman_pgp/workflows/subscription.py @@ -19,18 +19,19 @@ from mailman.core.i18n import _ from mailman.interfaces.workflows import ISubscriptionWorkflow -from mailman.workflows.common import (ConfirmationMixin, ModerationMixin, - SubscriptionBase, VerificationMixin) +from mailman.workflows.common import (ConfirmationMixin, VerificationMixin) from public import public from zope.interface import implementer -from mailman_pgp.workflows.base import PGPMixin -from mailman_pgp.workflows.pubkey import ConfirmPubkeyMixin, SetPubkeyMixin +from mailman_pgp.workflows.base import PGPMixin, PGPSubscriptionBase +from mailman_pgp.workflows.key_confirm import ConfirmPubkeyMixin +from mailman_pgp.workflows.key_set import SetPubkeyMixin +from mailman_pgp.workflows.mod_approval import ModeratorSubApprovalMixin @public @implementer(ISubscriptionWorkflow) -class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, +class OpenSubscriptionPolicy(PGPSubscriptionBase, VerificationMixin, SetPubkeyMixin, ConfirmPubkeyMixin, PGPMixin): """""" @@ -52,26 +53,28 @@ class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, def __init__(self, mlist, subscriber=None, *, pre_verified=False, pubkey=None, pubkey_pre_confirmed=False): - SubscriptionBase.__init__(self, mlist, subscriber) + PGPSubscriptionBase.__init__(self, mlist, subscriber) VerificationMixin.__init__(self, pre_verified=pre_verified) SetPubkeyMixin.__init__(self, pubkey=pubkey) ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) - PGPMixin.__init__(self) + PGPMixin.__init__(self, mlist) def _step_prepare(self): self.push('do_subscription') + self.push('restore_subscriber') self.push('pubkey_confirmation') + self.push('restore_address') self.push('pubkey_checks') - self.push('pgp_prepare') + self.push('create_address') self.push('verification_checks') self.push('sanity_checks') @public @implementer(ISubscriptionWorkflow) -class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, +class ConfirmSubscriptionPolicy(PGPSubscriptionBase, VerificationMixin, ConfirmationMixin, SetPubkeyMixin, - ConfirmPubkeyMixin, PGPMixin): + ConfirmPubkeyMixin): """""" name = 'pgp-policy-confirm' @@ -92,18 +95,19 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, def __init__(self, mlist, subscriber=None, *, pre_verified=False, pre_confirmed=False, pubkey=None, pubkey_pre_confirmed=False): - SubscriptionBase.__init__(self, mlist, subscriber) + PGPSubscriptionBase.__init__(self, mlist, subscriber) VerificationMixin.__init__(self, pre_verified=pre_verified) ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed) SetPubkeyMixin.__init__(self, pubkey=pubkey) ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) - PGPMixin.__init__(self) def _step_prepare(self): self.push('do_subscription') + self.push('restore_subscriber') self.push('pubkey_confirmation') + self.push('restore_address') self.push('pubkey_checks') - self.push('pgp_prepare') + self.push('create_address') self.push('confirmation_checks') self.push('verification_checks') self.push('sanity_checks') @@ -111,9 +115,9 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, @public @implementer(ISubscriptionWorkflow) -class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, - ModerationMixin, SetPubkeyMixin, - ConfirmPubkeyMixin, PGPMixin): +class ModerationSubscriptionPolicy(PGPSubscriptionBase, VerificationMixin, + ModeratorSubApprovalMixin, SetPubkeyMixin, + ConfirmPubkeyMixin): """""" name = 'pgp-policy-moderate' @@ -134,29 +138,31 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, def __init__(self, mlist, subscriber=None, *, pre_verified=False, pre_approved=False, pubkey=None, pubkey_pre_confirmed=False): - SubscriptionBase.__init__(self, mlist, subscriber) + PGPSubscriptionBase.__init__(self, mlist, subscriber) VerificationMixin.__init__(self, pre_verified=pre_verified) - ModerationMixin.__init__(self, pre_approved=pre_approved) + ModeratorSubApprovalMixin.__init__(self, pre_approved=pre_approved) SetPubkeyMixin.__init__(self, pubkey=pubkey) ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) - PGPMixin.__init__(self) def _step_prepare(self): self.push('do_subscription') - self.push('moderation_checks') + self.push('restore_subscriber') + self.push('mod_approval') self.push('pubkey_confirmation') + self.push('restore_address') self.push('pubkey_checks') - self.push('pgp_prepare') + self.push('create_address') self.push('verification_checks') self.push('sanity_checks') @public @implementer(ISubscriptionWorkflow) -class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, - ConfirmationMixin, ModerationMixin, - SetPubkeyMixin, ConfirmPubkeyMixin, - PGPMixin): +class ConfirmModerationSubscriptionPolicy(PGPSubscriptionBase, + VerificationMixin, + ConfirmationMixin, + ModeratorSubApprovalMixin, + SetPubkeyMixin, ConfirmPubkeyMixin): """""" name = 'pgp-policy-confirm-moderate' @@ -178,20 +184,22 @@ class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, def __init__(self, mlist, subscriber=None, *, pre_verified=False, pre_confirmed=False, pre_approved=False, pubkey=None, pubkey_pre_confirmed=False): - SubscriptionBase.__init__(self, mlist, subscriber) + PGPSubscriptionBase.__init__(self, mlist, subscriber) VerificationMixin.__init__(self, pre_verified=pre_verified) ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed) - ModerationMixin.__init__(self, pre_approved=pre_approved) + ModeratorSubApprovalMixin.__init__(self, pre_approved=pre_approved) SetPubkeyMixin.__init__(self, pubkey=pubkey) ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) - PGPMixin.__init__(self) def _step_prepare(self): self.push('do_subscription') - self.push('moderation_checks') + self.push('restore_subscriber') + self.push('mod_approval') + self.push('restore_address') self.push('pubkey_confirmation') + self.push('restore_address') self.push('pubkey_checks') - self.push('pgp_prepare') + self.push('create_address') self.push('confirmation_checks') self.push('verification_checks') self.push('sanity_checks') diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py index 31b3d05..3904105 100644 --- a/src/mailman_pgp/workflows/tests/test_base.py +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -37,8 +37,8 @@ from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.testing.layers import PGPConfigLayer from mailman_pgp.testing.pgp import load_key from mailman_pgp.workflows.base import (PGPMixin) -from mailman_pgp.workflows.pubkey import (ConfirmPubkeyMixin, KEY_REQUEST, - SetPubkeyMixin) +from mailman_pgp.workflows.key_confirm import ConfirmPubkeyMixin +from mailman_pgp.workflows.key_set import KEY_REQUEST, SetPubkeyMixin class PubkeyMixinTestSetup(): @@ -49,13 +49,14 @@ class PubkeyMixinTestSetup(): self.list_key = load_key('ecc_p256.priv.asc') - self.pgp_list = PGPMailingList.for_list(self.mlist) - self.pgp_list.key = self.list_key + with transaction(): + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = self.list_key self.um = getUtility(IUserManager) self.sender_key = load_key('rsa_1024.priv.asc') - self.sender = self.um.create_address('rsa-1024b@example.org') + self.sender = self.um.create_address('anne@example.org') @implementer(IWorkflow) @@ -78,13 +79,13 @@ class PGPTestWorkflow(SubscriptionBase, PGPMixin, SetPubkeyMixin, SubscriptionBase.__init__(self, mlist, subscriber) SetPubkeyMixin.__init__(self, pubkey=pubkey) ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) - PGPMixin.__init__(self) + PGPMixin.__init__(self, mlist) def _step_prepare(self): self.push('do_subscription') self.push('pubkey_confirmation') self.push('pubkey_checks') - self.push('pgp_prepare') + self.push('create_address') self.push('sanity_checks') @@ -93,7 +94,7 @@ class TestPGPMixin(PubkeyMixinTestSetup, unittest.TestCase): def test_create_address(self): workflow = PGPTestWorkflow(self.mlist, self.sender) - workflow.run_thru('pgp_prepare') + workflow.run_thru('create_address') pgp_address = PGPAddress.for_address(self.sender) self.assertIsNotNone(pgp_address) @@ -102,7 +103,7 @@ class TestPGPMixin(PubkeyMixinTestSetup, unittest.TestCase): with transaction() as t: pgp_address = PGPAddress(self.sender) t.add(pgp_address) - workflow.run_thru('pgp_prepare') + workflow.run_thru('create_address') still = PGPAddress.for_address(self.sender) self.assertIsNotNone(still) @@ -203,7 +204,7 @@ class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase): self.assertIsNotNone(workflow.token) pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) self.assertEqual(pendable['list_id'], 'test.example.com') - self.assertEqual(pendable['email'], 'rsa-1024b@example.org') + self.assertEqual(pendable['email'], 'anne@example.org') self.assertEqual(pendable['display_name'], '') self.assertEqual(pendable['when'], '2005-08-01T07:49:23') self.assertEqual(pendable['token_owner'], 'subscriber') @@ -216,7 +217,7 @@ class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase): self.assertIsNotNone(workflow.token) pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) self.assertEqual(pendable['list_id'], 'test.example.com') - self.assertEqual(pendable['email'], 'rsa-1024b@example.org') + self.assertEqual(pendable['email'], 'anne@example.org') self.assertEqual(pendable['display_name'], '') self.assertEqual(pendable['when'], '2005-08-01T07:49:23') self.assertEqual(pendable['token_owner'], 'subscriber') diff --git a/src/mailman_pgp/workflows/tests/test_key_change.py b/src/mailman_pgp/workflows/tests/test_key_change.py index e469d51..5d4926a 100644 --- a/src/mailman_pgp/workflows/tests/test_key_change.py +++ b/src/mailman_pgp/workflows/tests/test_key_change.py @@ -25,13 +25,15 @@ from mailman.interfaces.usermanager import IUserManager from mailman.testing.helpers import get_queue_messages from zope.component import getUtility +from mailman_pgp.config import mm_config from mailman_pgp.database import mm_transaction, transaction from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.testing.layers import PGPConfigLayer from mailman_pgp.testing.pgp import load_key -from mailman_pgp.workflows.key_change import KeyChangeWorkflow +from mailman_pgp.workflows.key_change import (KeyChangeModWorkflow, + KeyChangeWorkflow) class TestKeyChangeWorkflow(unittest.TestCase): @@ -41,13 +43,18 @@ class TestKeyChangeWorkflow(unittest.TestCase): with mm_transaction(): self.mlist = create_list('test@example.com', style_name='pgp-default') - self.pgp_list = PGPMailingList.for_list(self.mlist) - self.pgp_list.key = load_key('ecc_p256.priv.asc') + with transaction(): + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = load_key('ecc_p256.priv.asc') self.sender_key = load_key('rsa_1024.priv.asc') self.sender_new_key = load_key('ecc_p256.priv.asc') self.sender = getUtility(IUserManager).create_address( - 'rsa-1024b@example.org') + 'anne@example.org') + + def test_has_workflows(self): + self.assertTrue(KeyChangeWorkflow.name, mm_config.workflows) + self.assertTrue(KeyChangeModWorkflow.name, mm_config.workflows) def test_pgp_address_none(self): workflow = KeyChangeWorkflow(self.mlist) @@ -101,3 +108,19 @@ class TestKeyChangeWorkflow(unittest.TestCase): self.assertEqual(pgp_address.key_fingerprint, self.sender_new_key.fingerprint) self.assertTrue(pgp_address.key_confirmed) + + def test_confirm_mod(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow = KeyChangeModWorkflow(self.mlist, pgp_address, + self.sender_new_key.pubkey) + list(workflow) + + token, token_owner, member = ISubscriptionManager(self.mlist).confirm( + workflow.token) + self.assertIsNotNone(token) + self.assertEqual(token_owner, TokenOwner.moderator) diff --git a/src/mailman_pgp/workflows/tests/test_mod_approval.py b/src/mailman_pgp/workflows/tests/test_mod_approval.py new file mode 100644 index 0000000..7d57a9b --- /dev/null +++ b/src/mailman_pgp/workflows/tests/test_mod_approval.py @@ -0,0 +1,106 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +from unittest import TestCase + +from mailman.app.lifecycle import create_list +from mailman.interfaces.subscriptions import TokenOwner +from mailman.interfaces.usermanager import IUserManager +from mailman.interfaces.workflows import IWorkflow +from mailman.testing.helpers import get_queue_messages +from zope.component import getUtility +from zope.interface import implementer + +from mailman_pgp.database import mm_transaction, transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.testing.pgp import load_key +from mailman_pgp.workflows.key_change import KeyChangeBase +from mailman_pgp.workflows.mod_approval import ( + ModeratorKeyChangeApprovalMixin) + + +@implementer(IWorkflow) +class PGPTestWorkflow(KeyChangeBase, ModeratorKeyChangeApprovalMixin): + name = 'test-workflow' + description = '' + initial_state = 'mod_approval' + save_attributes = ( + 'approved', + ) + + def __init__(self, mlist, pgp_address=None, pubkey=None, + pre_approved=False): + KeyChangeBase.__init__(self, mlist, pgp_address, pubkey) + ModeratorKeyChangeApprovalMixin.__init__(self, pre_approved) + + +class TestModeratorApprovalMixin(TestCase): + layer = PGPConfigLayer + + def setUp(self): + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + with transaction(): + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = load_key('ecc_p256.priv.asc') + + self.sender_key = load_key('rsa_1024.priv.asc') + self.sender_new_key = load_key('ecc_p256.priv.asc') + self.sender = getUtility(IUserManager).create_address( + 'anne@example.org') + + def test_get_approval(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow = PGPTestWorkflow(self.mlist, pgp_address, + self.sender_new_key.pubkey) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + + self.assertEqual(message['Subject'], + 'New key change request from {}'.format( + pgp_address.email)) + wrapped = PGPWrapper(message) + self.assertTrue(wrapped.has_keys()) + keys = list(wrapped.keys()) + self.assertEqual(len(keys), 1) + key = keys.pop() + self.assertEqual(key.fingerprint, self.sender_new_key.fingerprint) + + def test_receive_approval(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow = PGPTestWorkflow(self.mlist, pgp_address, + self.sender_new_key.pubkey) + list(workflow) + get_queue_messages('virgin', expected_count=1) + list(workflow) + self.assertEqual(workflow.token_owner, TokenOwner.no_one) |
