diff options
Diffstat (limited to 'src/mailman/app')
| -rw-r--r-- | src/mailman/app/docs/moderator.rst | 10 | ||||
| -rw-r--r-- | src/mailman/app/events.py | 2 | ||||
| -rw-r--r-- | src/mailman/app/subscriptions.py | 552 | ||||
| -rw-r--r-- | src/mailman/app/tests/test_moderation.py | 2 | ||||
| -rw-r--r-- | src/mailman/app/tests/test_subscriptions.py | 720 | ||||
| -rw-r--r-- | src/mailman/app/tests/test_unsubscriptions.py | 522 | ||||
| -rw-r--r-- | src/mailman/app/tests/test_workflow.py | 183 | ||||
| -rw-r--r-- | src/mailman/app/tests/test_workflowmanager.py | 22 | ||||
| -rw-r--r-- | src/mailman/app/workflow.py | 151 |
9 files changed, 38 insertions, 2126 deletions
diff --git a/src/mailman/app/docs/moderator.rst b/src/mailman/app/docs/moderator.rst index ce25a4711..6fcaf0b08 100644 --- a/src/mailman/app/docs/moderator.rst +++ b/src/mailman/app/docs/moderator.rst @@ -224,7 +224,7 @@ Fred is a member of the mailing list... >>> from mailman.interfaces.subscriptions import ISubscriptionManager >>> registrar = ISubscriptionManager(mlist) >>> token, token_owner, member = registrar.register( - ... fred, pre_verified=True, pre_confirmed=True, pre_approved=True) + ... fred, pre_verified=True, pre_confirmed=True) >>> member <Member: Fred Person <fred@example.com> on ant@example.com as MemberRole.member> @@ -301,16 +301,16 @@ Usually, the list administrators want to be notified when there are membership change requests they need to moderate. These notifications are sent when the list is configured to send them. - >>> from mailman.interfaces.mailinglist import SubscriptionPolicy + >>> from mailman.workflows.subscription import ModerationSubscriptionPolicy >>> mlist.admin_immed_notify = True - >>> mlist.subscription_policy = SubscriptionPolicy.moderate + >>> mlist.subscription_policy = ModerationSubscriptionPolicy Gwen tries to subscribe to the mailing list. >>> gwen = getUtility(IUserManager).create_address( ... 'gwen@example.com', 'Gwen Person') >>> token, token_owner, member = registrar.register( - ... gwen, pre_verified=True, pre_confirmed=True) + ... gwen, pre_verified=True) Her subscription must be approved by the list administrator, so she is not yet a member of the mailing list. @@ -414,7 +414,7 @@ message. >>> herb = getUtility(IUserManager).create_address( ... 'herb@example.com', 'Herb Person') >>> token, token_owner, member = registrar.register( - ... herb, pre_verified=True, pre_confirmed=True, pre_approved=True) + ... herb, pre_verified=True, pre_approved=True) >>> messages = get_queue_messages('virgin') >>> len(messages) 1 diff --git a/src/mailman/app/events.py b/src/mailman/app/events.py index bf7f2ece8..a3fca30a8 100644 --- a/src/mailman/app/events.py +++ b/src/mailman/app/events.py @@ -38,7 +38,5 @@ def initialize(): passwords.handle_ConfigurationUpdatedEvent, style_manager.handle_ConfigurationUpdatedEvent, subscriptions.handle_ListDeletingEvent, - subscriptions.handle_SubscriptionConfirmationNeededEvent, - subscriptions.handle_UnsubscriptionConfirmationNeededEvent, switchboard.handle_ConfigurationUpdatedEvent, ]) diff --git a/src/mailman/app/subscriptions.py b/src/mailman/app/subscriptions.py index 0d8176ffb..ba62e9f52 100644 --- a/src/mailman/app/subscriptions.py +++ b/src/mailman/app/subscriptions.py @@ -17,515 +17,36 @@ """Handle subscriptions.""" -import uuid -import logging - -from datetime import timedelta -from email.utils import formataddr -from enum import Enum -from mailman.app.membership import delete_member -from mailman.app.workflow import Workflow -from mailman.core.i18n import _ +from mailman.config import config from mailman.database.transaction import flush -from mailman.email.message import UserNotification -from mailman.interfaces.address import IAddress -from mailman.interfaces.bans import IBanManager from mailman.interfaces.listmanager import ListDeletingEvent -from mailman.interfaces.mailinglist import SubscriptionPolicy -from mailman.interfaces.member import ( - AlreadySubscribedError, MemberRole, MembershipIsBannedError, - NotAMemberError) -from mailman.interfaces.pending import IPendable, IPendings +from mailman.interfaces.pending import IPendings from mailman.interfaces.subscriptions import ( - ISubscriptionManager, ISubscriptionService, - SubscriptionConfirmationNeededEvent, SubscriptionPendingError, TokenOwner, - UnsubscriptionConfirmationNeededEvent) -from mailman.interfaces.template import ITemplateLoader -from mailman.interfaces.user import IUser -from mailman.interfaces.usermanager import IUserManager -from mailman.interfaces.workflow import IWorkflowStateManager -from mailman.utilities.datetime import now -from mailman.utilities.string import expand, wrap + ISubscriptionManager, ISubscriptionService) +from mailman.interfaces.workflows import IWorkflowStateManager +from mailman.workflows.common import (PendableSubscription, + PendableUnsubscription) from public import public from zope.component import getUtility -from zope.event import notify from zope.interface import implementer -log = logging.getLogger('mailman.subscribe') - - -class WhichSubscriber(Enum): - address = 1 - user = 2 - - -@implementer(IPendable) -class PendableSubscription(dict): - PEND_TYPE = 'subscription' - - -@implementer(IPendable) -class PendableUnsubscription(dict): - PEND_TYPE = 'unsubscription' - - -class _SubscriptionWorkflowCommon(Workflow): - """Common support between subscription and unsubscription.""" - - PENDABLE_CLASS = None - - def __init__(self, mlist, subscriber): - super().__init__() - self.mlist = mlist - self.address = None - self.user = None - self.which = None - self.member = None - self._set_token(TokenOwner.no_one) - # The subscriber must be either an IUser or IAddress. - if IAddress.providedBy(subscriber): - self.address = subscriber - self.user = self.address.user - self.which = WhichSubscriber.address - elif IUser.providedBy(subscriber): - self.address = subscriber.preferred_address - self.user = subscriber - self.which = WhichSubscriber.user - self.subscriber = subscriber - - @property - def user_key(self): - # For save. - return self.user.user_id.hex - - @user_key.setter - def user_key(self, hex_key): - # For restore. - uid = uuid.UUID(hex_key) - self.user = getUtility(IUserManager).get_user_by_id(uid) - if self.user is None: - self.user = self.address.user - - @property - def address_key(self): - # For save. - return self.address.email - - @address_key.setter - def address_key(self, email): - # For restore. - self.address = getUtility(IUserManager).get_address(email) - assert self.address is not None - - @property - def subscriber_key(self): - return self.which.value - - @subscriber_key.setter - def subscriber_key(self, key): - self.which = WhichSubscriber(key) - - @property - def token_owner_key(self): - return self.token_owner.value - - @token_owner_key.setter - def token_owner_key(self, value): - self.token_owner = TokenOwner(value) - - def _set_token(self, token_owner): - assert isinstance(token_owner, TokenOwner) - pendings = getUtility(IPendings) - # Clear out the previous pending token if there is one. - if self.token is not None: - pendings.confirm(self.token) - # Create a new token to prevent replay attacks. It seems like this - # would produce the same token, but it won't because the pending adds a - # bit of randomization. - self.token_owner = token_owner - if token_owner is TokenOwner.no_one: - self.token = None - return - pendable = self.PENDABLE_CLASS( - list_id=self.mlist.list_id, - email=self.address.email, - display_name=self.address.display_name, - when=now().replace(microsecond=0).isoformat(), - token_owner=token_owner.name, - ) - self.token = pendings.add(pendable, timedelta(days=3650)) - - -@public -class SubscriptionWorkflow(_SubscriptionWorkflowCommon): - """Workflow of a subscription request.""" - - PENDABLE_CLASS = PendableSubscription - INITIAL_STATE = 'sanity_checks' - SAVE_ATTRIBUTES = ( - 'pre_approved', - 'pre_confirmed', - 'pre_verified', - 'address_key', - 'subscriber_key', - 'user_key', - 'token_owner_key', - ) - - def __init__(self, mlist, subscriber=None, *, - pre_verified=False, pre_confirmed=False, pre_approved=False): - super().__init__(mlist, subscriber) - self.pre_verified = pre_verified - self.pre_confirmed = pre_confirmed - self.pre_approved = pre_approved - - def _step_sanity_checks(self): - # Ensure that we have both an address and a user, even if the address - # is not verified. We can't set the preferred address until it is - # verified. - if self.user is None: - # The address has no linked user so create one, link it, and set - # the user's preferred address. - assert self.address is not None, 'No address or user' - self.user = getUtility(IUserManager).make_user(self.address.email) - if self.address is None: - assert self.user.preferred_address is None, ( - "Preferred address exists, but wasn't used in constructor") - addresses = list(self.user.addresses) - if len(addresses) == 0: - raise AssertionError('User has no addresses: {}'.format( - self.user)) - # This is rather arbitrary, but we have no choice. - self.address = addresses[0] - assert self.user is not None and self.address is not None, ( - 'Insane sanity check results') - # Is this subscriber already a member? - if (self.which is WhichSubscriber.user and - self.user.preferred_address is not None): - subscriber = self.user - else: - subscriber = self.address - if self.mlist.is_subscribed(subscriber): - # 2017-04-22 BAW: This branch actually *does* get covered, as I've - # verified by a full coverage run, but diffcov for some reason - # claims that the test added in the branch that added this code - # does not cover the change. That seems like a bug in diffcov. - raise AlreadySubscribedError( # pragma: nocover - self.mlist.fqdn_listname, - self.address.email, - MemberRole.member) - # Is this email address banned? - if IBanManager(self.mlist).is_banned(self.address.email): - raise MembershipIsBannedError(self.mlist, self.address.email) - # Check if there is already a subscription request for this email. - pendings = getUtility(IPendings).find( - mlist=self.mlist, - pend_type='subscription') - for token, pendable in pendings: - if pendable['email'] == self.address.email: - raise SubscriptionPendingError(self.mlist, self.address.email) - # Start out with the subscriber being the token owner. - self.push('verification_checks') - - def _step_verification_checks(self): - # Is the address already verified, or is the pre-verified flag set? - if self.address.verified_on is None: - if self.pre_verified: - self.address.verified_on = now() - else: - # The address being subscribed is not yet verified, so we need - # to send a validation email that will also confirm that the - # user wants to be subscribed to this mailing list. - self.push('send_confirmation') - return - self.push('confirmation_checks') - - def _step_confirmation_checks(self): - # If the list's subscription policy is open, then the user can be - # subscribed right here and now. - if self.mlist.subscription_policy is SubscriptionPolicy.open: - self.push('do_subscription') - return - # If we do not need the user's confirmation, then skip to the - # moderation checks. - if self.mlist.subscription_policy is SubscriptionPolicy.moderate: - self.push('moderation_checks') - return - # If the subscription has been pre-confirmed, then we can skip the - # confirmation check can be skipped. If moderator approval is - # required we need to check that, otherwise we can go straight to - # subscription. - if self.pre_confirmed: - next_step = ( - 'moderation_checks' - if self.mlist.subscription_policy is - SubscriptionPolicy.confirm_then_moderate # noqa: E131 - else 'do_subscription') - self.push(next_step) - return - # The user must confirm their subscription. - self.push('send_confirmation') - - def _step_moderation_checks(self): - # Does the moderator need to approve the subscription request? - assert self.mlist.subscription_policy in ( - SubscriptionPolicy.moderate, - SubscriptionPolicy.confirm_then_moderate, - ), self.mlist.subscription_policy - if self.pre_approved: - self.push('do_subscription') - else: - self.push('get_moderator_approval') - - def _step_get_moderator_approval(self): - # Here's the next step in the workflow, assuming the moderator - # approves of the subscription. If they don't, the workflow and - # subscription request will just be thrown away. - self._set_token(TokenOwner.moderator) - self.push('subscribe_from_restored') - self.save() - log.info('{}: held subscription request from {}'.format( - self.mlist.fqdn_listname, self.address.email)) - # Possibly send a notification to the list moderators. - if self.mlist.admin_immed_notify: - subject = _( - 'New subscription request to $self.mlist.display_name ' - 'from $self.address.email') - username = formataddr( - (self.subscriber.display_name, self.address.email)) - template = getUtility(ITemplateLoader).get( - 'list:admin:action:subscribe', self.mlist) - text = wrap(expand(template, self.mlist, dict( - member=username, - ))) - # This message should appear to come from the <list>-owner so as - # to avoid any useless bounce processing. - msg = UserNotification( - self.mlist.owner_address, self.mlist.owner_address, - subject, text, self.mlist.preferred_language) - msg.send(self.mlist) - # The workflow must stop running here. - raise StopIteration - - def _step_subscribe_from_restored(self): - # Prevent replay attacks. - self._set_token(TokenOwner.no_one) - # Restore a little extra state that can't be stored in the database - # (because the order of setattr() on restore is indeterminate), then - # subscribe the user. - if self.which is WhichSubscriber.address: - self.subscriber = self.address - else: - assert self.which is WhichSubscriber.user - self.subscriber = self.user - self.push('do_subscription') - - def _step_do_subscription(self): - # We can immediately subscribe the user to the mailing list. - self.member = self.mlist.subscribe(self.subscriber) - assert self.token is None and self.token_owner is TokenOwner.no_one, ( - 'Unexpected active token at end of subscription workflow') - - def _step_send_confirmation(self): - self._set_token(TokenOwner.subscriber) - self.push('do_confirm_verify') - self.save() - # Triggering this event causes the confirmation message to be sent. - notify(SubscriptionConfirmationNeededEvent( - self.mlist, self.token, self.address.email)) - # Now we wait for the confirmation. - raise StopIteration - - def _step_do_confirm_verify(self): - # Restore a little extra state that can't be stored in the database - # (because the order of setattr() on restore is indeterminate), then - # continue with the confirmation/verification step. - if self.which is WhichSubscriber.address: - self.subscriber = self.address - else: - assert self.which is WhichSubscriber.user - self.subscriber = self.user - # Reset the token so it can't be used in a replay attack. - self._set_token(TokenOwner.no_one) - # The user has confirmed their subscription request, and also verified - # their email address if necessary. This latter needs to be set on the - # IAddress, but there's nothing more to do about the confirmation step. - # We just continue along with the workflow. - if self.address.verified_on is None: - self.address.verified_on = now() - # The next step depends on the mailing list's subscription policy. - next_step = ('moderation_checks' - if self.mlist.subscription_policy in ( - SubscriptionPolicy.moderate, - SubscriptionPolicy.confirm_then_moderate, - ) - else 'do_subscription') - self.push(next_step) - - -@public -class UnSubscriptionWorkflow(_SubscriptionWorkflowCommon): - """Workflow of a unsubscription request.""" - - PENDABLE_CLASS = PendableUnsubscription - INITIAL_STATE = 'subscription_checks' - SAVE_ATTRIBUTES = ( - 'pre_approved', - 'pre_confirmed', - 'address_key', - 'user_key', - 'subscriber_key', - 'token_owner_key', - ) - - def __init__(self, mlist, subscriber=None, *, - pre_approved=False, pre_confirmed=False): - super().__init__(mlist, subscriber) - if IAddress.providedBy(subscriber) or IUser.providedBy(subscriber): - self.member = self.mlist.regular_members.get_member( - self.address.email) - self.pre_confirmed = pre_confirmed - self.pre_approved = pre_approved - - def _step_subscription_checks(self): - assert self.mlist.is_subscribed(self.subscriber) - self.push('confirmation_checks') - - def _step_confirmation_checks(self): - # If list's unsubscription policy is open, the user can unsubscribe - # right now. - if self.mlist.unsubscription_policy is SubscriptionPolicy.open: - self.push('do_unsubscription') - return - # If we don't need the user's confirmation, then skip to the moderation - # checks. - if self.mlist.unsubscription_policy is SubscriptionPolicy.moderate: - self.push('moderation_checks') - return - # If the request is pre-confirmed, then the user can unsubscribe right - # now. - if self.pre_confirmed: - self.push('do_unsubscription') - return - # The user must confirm their un-subsbcription. - self.push('send_confirmation') - - def _step_send_confirmation(self): - self._set_token(TokenOwner.subscriber) - self.push('do_confirm_verify') - self.save() - notify(UnsubscriptionConfirmationNeededEvent( - self.mlist, self.token, self.address.email)) - raise StopIteration - - def _step_moderation_checks(self): - # Does the moderator need to approve the unsubscription request? - assert self.mlist.unsubscription_policy in ( - SubscriptionPolicy.moderate, - SubscriptionPolicy.confirm_then_moderate, - ), self.mlist.unsubscription_policy - if self.pre_approved: - self.push('do_unsubscription') - else: - self.push('get_moderator_approval') - - def _step_get_moderator_approval(self): - self._set_token(TokenOwner.moderator) - self.push('unsubscribe_from_restored') - self.save() - log.info('{}: held unsubscription request from {}'.format( - self.mlist.fqdn_listname, self.address.email)) - if self.mlist.admin_immed_notify: - subject = _( - 'New unsubscription request to $self.mlist.display_name ' - 'from $self.address.email') - username = formataddr( - (self.subscriber.display_name, self.address.email)) - template = getUtility(ITemplateLoader).get( - 'list:admin:action:unsubscribe', self.mlist) - text = wrap(expand(template, self.mlist, dict( - member=username, - ))) - # This message should appear to come from the <list>-owner so as - # to avoid any useless bounce processing. - msg = UserNotification( - self.mlist.owner_address, self.mlist.owner_address, - subject, text, self.mlist.preferred_language) - msg.send(self.mlist) - # The workflow must stop running here - raise StopIteration - - def _step_do_confirm_verify(self): - # Restore a little extra state that can't be stored in the database - # (because the order of setattr() on restore is indeterminate), then - # continue with the confirmation/verification step. - if self.which is WhichSubscriber.address: - self.subscriber = self.address - else: - assert self.which is WhichSubscriber.user - self.subscriber = self.user - # Reset the token so it can't be used in a replay attack. - self._set_token(TokenOwner.no_one) - # Restore the member object. - self.member = self.mlist.regular_members.get_member(self.address.email) - # It's possible the member was already unsubscribed while we were - # waiting for the confirmation. - if self.member is None: - return - # The user has confirmed their unsubscription request - next_step = ('moderation_checks' - if self.mlist.unsubscription_policy in ( - SubscriptionPolicy.moderate, - SubscriptionPolicy.confirm_then_moderate, - ) - else 'do_unsubscription') - self.push(next_step) - - def _step_do_unsubscription(self): - try: - delete_member(self.mlist, self.address.email) - except NotAMemberError: - # The member has already been unsubscribed. - pass - self.member = None - assert self.token is None and self.token_owner is TokenOwner.no_one, ( - 'Unexpected active token at end of subscription workflow') - - def _step_unsubscribe_from_restored(self): - # Prevent replay attacks. - self._set_token(TokenOwner.no_one) - if self.which is WhichSubscriber.address: - self.subscriber = self.address - else: - assert self.which is WhichSubscriber.user - self.subscriber = self.user - self.push('do_unsubscription') - - @public @implementer(ISubscriptionManager) class SubscriptionManager: def __init__(self, mlist): self._mlist = mlist - def register(self, subscriber=None, *, - pre_verified=False, pre_confirmed=False, pre_approved=False): + def register(self, subscriber=None, **kwargs): """See `ISubscriptionManager`.""" - workflow = SubscriptionWorkflow( - self._mlist, subscriber, - pre_verified=pre_verified, - pre_confirmed=pre_confirmed, - pre_approved=pre_approved) + workflow = self._mlist.subscription_policy(self._mlist, subscriber, + **kwargs) list(workflow) return workflow.token, workflow.token_owner, workflow.member - def unregister(self, subscriber=None, *, - pre_confirmed=False, pre_approved=False): - workflow = UnSubscriptionWorkflow( - self._mlist, subscriber, - pre_confirmed=pre_confirmed, - pre_approved=pre_approved) + def unregister(self, subscriber=None, **kwargs): + workflow = self._mlist.unsubscription_policy(self._mlist, subscriber, + **kwargs) list(workflow) return workflow.token, workflow.token_owner, workflow.member @@ -536,11 +57,13 @@ class SubscriptionManager: if pendable is None: raise LookupError workflow_type = pendable.get('type') - assert workflow_type in (PendableSubscription.PEND_TYPE, - PendableUnsubscription.PEND_TYPE) - workflow = (SubscriptionWorkflow - if workflow_type == PendableSubscription.PEND_TYPE - else UnSubscriptionWorkflow)(self._mlist) + if workflow_type in {PendableSubscription.PEND_TYPE, + PendableUnsubscription.PEND_TYPE}: + workflow = (self._mlist.subscription_policy # pragma: nocover + if workflow_type == PendableSubscription.PEND_TYPE + else self._mlist.unsubscription_policy)(self._mlist) + else: + workflow = config.workflows[workflow_type](self._mlist) workflow.token = token workflow.restore() # In order to just run the whole workflow, all we need to do @@ -556,41 +79,6 @@ class SubscriptionManager: getUtility(IWorkflowStateManager).discard(token) -def _handle_confirmation_needed_events(event, template_name): - subject = 'confirm {}'.format(event.token) - confirm_address = event.mlist.confirm_address(event.token) - email_address = event.email - # Send a verification email to the address. - template = getUtility(ITemplateLoader).get(template_name, event.mlist) - text = expand(template, event.mlist, dict( - token=event.token, - subject=subject, - confirm_email=confirm_address, - user_email=email_address, - # For backward compatibility. - confirm_address=confirm_address, - email_address=email_address, - domain_name=event.mlist.domain.mail_host, - contact_address=event.mlist.owner_address, - )) - msg = UserNotification(email_address, confirm_address, subject, text) - msg.send(event.mlist, add_precedence=False) - - -@public -def handle_SubscriptionConfirmationNeededEvent(event): - if not isinstance(event, SubscriptionConfirmationNeededEvent): - return - _handle_confirmation_needed_events(event, 'list:user:action:subscribe') - - -@public -def handle_UnsubscriptionConfirmationNeededEvent(event): - if not isinstance(event, UnsubscriptionConfirmationNeededEvent): - return - _handle_confirmation_needed_events(event, 'list:user:action:unsubscribe') - - @public def handle_ListDeletingEvent(event): """Delete a mailing list's members when the list is being deleted.""" @@ -599,6 +87,6 @@ def handle_ListDeletingEvent(event): return # Find all the members still associated with the mailing list. members = getUtility(ISubscriptionService).find_members( - list_id=event.mailing_list.list_id) + list_id=event.mailing_list.list_id) for member in members: member.unsubscribe() diff --git a/src/mailman/app/tests/test_moderation.py b/src/mailman/app/tests/test_moderation.py index 5448a1d79..7d7524a10 100644 --- a/src/mailman/app/tests/test_moderation.py +++ b/src/mailman/app/tests/test_moderation.py @@ -162,7 +162,7 @@ class TestUnsubscription(unittest.TestCase): user_manager = getUtility(IUserManager) anne = user_manager.create_address('anne@example.org', 'Anne Person') token, token_owner, member = self._manager.register( - anne, pre_verified=True, pre_confirmed=True, pre_approved=True) + anne, pre_verified=True, pre_confirmed=True) self.assertIsNone(token) self.assertEqual(member.address.email, 'anne@example.org') bart = user_manager.create_user('bart@example.com', 'Bart User') diff --git a/src/mailman/app/tests/test_subscriptions.py b/src/mailman/app/tests/test_subscriptions.py deleted file mode 100644 index 0519ec385..000000000 --- a/src/mailman/app/tests/test_subscriptions.py +++ /dev/null @@ -1,720 +0,0 @@ -# Copyright (C) 2011-2017 by the Free Software Foundation, Inc. -# -# This file is part of GNU Mailman. -# -# GNU Mailman is free software: you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free -# Software Foundation, either version 3 of the License, or (at your option) -# any later version. -# -# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -# more details. -# -# You should have received a copy of the GNU General Public License along with -# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. - -"""Tests for the subscription service.""" - -import unittest - -from contextlib import suppress -from mailman.app.lifecycle import create_list -from mailman.app.subscriptions import SubscriptionWorkflow -from mailman.interfaces.bans import IBanManager -from mailman.interfaces.mailinglist import SubscriptionPolicy -from mailman.interfaces.member import MemberRole, MembershipIsBannedError -from mailman.interfaces.pending import IPendings -from mailman.interfaces.subscriptions import TokenOwner -from mailman.interfaces.usermanager import IUserManager -from mailman.testing.helpers import ( - LogFileMark, get_queue_messages, set_preferred) -from mailman.testing.layers import ConfigLayer -from mailman.utilities.datetime import now -from unittest.mock import patch -from zope.component import getUtility - - -class TestSubscriptionWorkflow(unittest.TestCase): - layer = ConfigLayer - maxDiff = None - - def setUp(self): - self._mlist = create_list('test@example.com') - self._mlist.admin_immed_notify = False - self._anne = 'anne@example.com' - self._user_manager = getUtility(IUserManager) - self._expected_pendings_count = 0 - - def tearDown(self): - # There usually should be no pending after all is said and done, but - # some tests don't complete the workflow. - self.assertEqual(getUtility(IPendings).count, - self._expected_pendings_count) - - def test_start_state(self): - # The workflow starts with no tokens or member. - workflow = SubscriptionWorkflow(self._mlist) - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - self.assertIsNone(workflow.member) - - def test_pended_data(self): - # There is a Pendable associated with the held request, and it has - # some data associated with it. - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne) - with suppress(StopIteration): - workflow.run_thru('send_confirmation') - self.assertIsNotNone(workflow.token) - pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) - self.assertEqual(pendable['list_id'], 'test.example.com') - self.assertEqual(pendable['email'], 'anne@example.com') - self.assertEqual(pendable['display_name'], '') - self.assertEqual(pendable['when'], '2005-08-01T07:49:23') - self.assertEqual(pendable['token_owner'], 'subscriber') - # The token is still in the database. - self._expected_pendings_count = 1 - - def test_user_or_address_required(self): - # The `subscriber` attribute must be a user or address. - workflow = SubscriptionWorkflow(self._mlist) - self.assertRaises(AssertionError, list, workflow) - - def test_sanity_checks_address(self): - # Ensure that the sanity check phase, when given an IAddress, ends up - # with a linked user. - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne) - self.assertIsNotNone(workflow.address) - self.assertIsNone(workflow.user) - workflow.run_thru('sanity_checks') - self.assertIsNotNone(workflow.address) - self.assertIsNotNone(workflow.user) - self.assertEqual(list(workflow.user.addresses)[0].email, self._anne) - - def test_sanity_checks_user_with_preferred_address(self): - # Ensure that the sanity check phase, when given an IUser with a - # preferred address, ends up with an address. - anne = self._user_manager.make_user(self._anne) - address = set_preferred(anne) - workflow = SubscriptionWorkflow(self._mlist, anne) - # The constructor sets workflow.address because the user has a - # preferred address. - self.assertEqual(workflow.address, address) - self.assertEqual(workflow.user, anne) - workflow.run_thru('sanity_checks') - self.assertEqual(workflow.address, address) - self.assertEqual(workflow.user, anne) - - def test_sanity_checks_user_without_preferred_address(self): - # Ensure that the sanity check phase, when given a user without a - # preferred address, but with at least one linked address, gets an - # address. - anne = self._user_manager.make_user(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne) - self.assertIsNone(workflow.address) - self.assertEqual(workflow.user, anne) - workflow.run_thru('sanity_checks') - self.assertIsNotNone(workflow.address) - self.assertEqual(workflow.user, anne) - - def test_sanity_checks_user_with_multiple_linked_addresses(self): - # Ensure that the santiy check phase, when given a user without a - # preferred address, but with multiple linked addresses, gets of of - # those addresses (exactly which one is undefined). - anne = self._user_manager.make_user(self._anne) - anne.link(self._user_manager.create_address('anne@example.net')) - anne.link(self._user_manager.create_address('anne@example.org')) - workflow = SubscriptionWorkflow(self._mlist, anne) - self.assertIsNone(workflow.address) - self.assertEqual(workflow.user, anne) - workflow.run_thru('sanity_checks') - self.assertIn(workflow.address.email, ['anne@example.com', - 'anne@example.net', - 'anne@example.org']) - self.assertEqual(workflow.user, anne) - - def test_sanity_checks_user_without_addresses(self): - # It is an error to try to subscribe a user with no linked addresses. - user = self._user_manager.create_user() - workflow = SubscriptionWorkflow(self._mlist, user) - self.assertRaises(AssertionError, workflow.run_thru, 'sanity_checks') - - def test_sanity_checks_globally_banned_address(self): - # An exception is raised if the address is globally banned. - anne = self._user_manager.create_address(self._anne) - IBanManager(None).ban(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne) - self.assertRaises(MembershipIsBannedError, list, workflow) - - def test_sanity_checks_banned_address(self): - # An exception is raised if the address is banned by the mailing list. - anne = self._user_manager.create_address(self._anne) - IBanManager(self._mlist).ban(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne) - self.assertRaises(MembershipIsBannedError, list, workflow) - - def test_verification_checks_with_verified_address(self): - # When the address is already verified, we skip straight to the - # confirmation checks. - anne = self._user_manager.create_address(self._anne) - anne.verified_on = now() - workflow = SubscriptionWorkflow(self._mlist, anne) - workflow.run_thru('verification_checks') - with patch.object(workflow, '_step_confirmation_checks') as step: - next(workflow) - step.assert_called_once_with() - - def test_verification_checks_with_pre_verified_address(self): - # When the address is not yet verified, but the pre-verified flag is - # passed to the workflow, we skip to the confirmation checks. - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True) - workflow.run_thru('verification_checks') - with patch.object(workflow, '_step_confirmation_checks') as step: - next(workflow) - step.assert_called_once_with() - # And now the address is verified. - self.assertIsNotNone(anne.verified_on) - - def test_verification_checks_confirmation_needed(self): - # The address is neither verified, nor is the pre-verified flag set. - # A confirmation message must be sent to the user which will also - # verify their address. - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne) - workflow.run_thru('verification_checks') - with patch.object(workflow, '_step_send_confirmation') as step: - next(workflow) - step.assert_called_once_with() - # The address still hasn't been verified. - self.assertIsNone(anne.verified_on) - - def test_confirmation_checks_open_list(self): - # A subscription to an open list does not need to be confirmed or - # moderated. - self._mlist.subscription_policy = SubscriptionPolicy.open - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_do_subscription') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_no_user_confirmation_needed(self): - # A subscription to a list which does not need user confirmation skips - # to the moderation checks. - self._mlist.subscription_policy = SubscriptionPolicy.moderate - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_moderation_checks') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_confirm_pre_confirmed(self): - # The subscription policy requires user confirmation, but their - # subscription is pre-confirmed. Since moderation is not required, - # the user will be immediately subscribed. - self._mlist.subscription_policy = SubscriptionPolicy.confirm - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, - pre_verified=True, - pre_confirmed=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_do_subscription') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_confirm_then_moderate_pre_confirmed(self): - # The subscription policy requires user confirmation, but their - # subscription is pre-confirmed. Since moderation is required, that - # check will be performed. - self._mlist.subscription_policy = ( - SubscriptionPolicy.confirm_then_moderate) - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, - pre_verified=True, - pre_confirmed=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_moderation_checks') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_confirm_and_moderate_pre_confirmed(self): - # The subscription policy requires user confirmation and moderation, - # but their subscription is pre-confirmed. - self._mlist.subscription_policy = ( - SubscriptionPolicy.confirm_then_moderate) - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, - pre_verified=True, - pre_confirmed=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_moderation_checks') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_confirmation_needed(self): - # The subscription policy requires confirmation and the subscription - # is not pre-confirmed. - self._mlist.subscription_policy = SubscriptionPolicy.confirm - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_send_confirmation') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_moderate_confirmation_needed(self): - # The subscription policy requires confirmation and moderation, and the - # subscription is not pre-confirmed. - self._mlist.subscription_policy = ( - SubscriptionPolicy.confirm_then_moderate) - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_send_confirmation') as step: - next(workflow) - step.assert_called_once_with() - - def test_moderation_checks_pre_approved(self): - # The subscription is pre-approved by the moderator. - self._mlist.subscription_policy = SubscriptionPolicy.moderate - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, - pre_verified=True, - pre_approved=True) - workflow.run_thru('moderation_checks') - with patch.object(workflow, '_step_do_subscription') as step: - next(workflow) - step.assert_called_once_with() - - def test_moderation_checks_approval_required(self): - # The moderator must approve the subscription. - self._mlist.subscription_policy = SubscriptionPolicy.moderate - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True) - workflow.run_thru('moderation_checks') - with patch.object(workflow, '_step_get_moderator_approval') as step: - next(workflow) - step.assert_called_once_with() - - def test_do_subscription(self): - # An open subscription policy plus a pre-verified address means the - # user gets subscribed to the mailing list without any further - # confirmations or approvals. - self._mlist.subscription_policy = SubscriptionPolicy.open - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True) - # Consume the entire state machine. - list(workflow) - # Anne is now a member of the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertEqual(member.address, anne) - self.assertEqual(workflow.member, member) - # No further token is needed. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - - def test_do_subscription_pre_approved(self): - # An moderation-requiring subscription policy plus a pre-verified and - # pre-approved address means the user gets subscribed to the mailing - # list without any further confirmations or approvals. - self._mlist.subscription_policy = SubscriptionPolicy.moderate - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, - pre_verified=True, - pre_approved=True) - # Consume the entire state machine. - list(workflow) - # Anne is now a member of the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertEqual(member.address, anne) - self.assertEqual(workflow.member, member) - # No further token is needed. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - - def test_do_subscription_pre_approved_pre_confirmed(self): - # An moderation-requiring subscription policy plus a pre-verified and - # pre-approved address means the user gets subscribed to the mailing - # list without any further confirmations or approvals. - self._mlist.subscription_policy = ( - SubscriptionPolicy.confirm_then_moderate) - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, - pre_verified=True, - pre_confirmed=True, - pre_approved=True) - # Consume the entire state machine. - list(workflow) - # Anne is now a member of the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertEqual(member.address, anne) - self.assertEqual(workflow.member, member) - # No further token is needed. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - - def test_do_subscription_cleanups(self): - # Once the user is subscribed, the token, and its associated pending - # database record will be removed from the database. - self._mlist.subscription_policy = SubscriptionPolicy.open - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, - pre_verified=True, - pre_confirmed=True, - pre_approved=True) - # Consume the entire state machine. - list(workflow) - # Anne is now a member of the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertEqual(member.address, anne) - self.assertEqual(workflow.member, member) - # The workflow is done, so it has no token. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - - def test_moderator_approves(self): - # The workflow runs until moderator approval is required, at which - # point the workflow is saved. Once the moderator approves, the - # workflow resumes and the user is subscribed. - self._mlist.subscription_policy = SubscriptionPolicy.moderate - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, - pre_verified=True, - pre_confirmed=True) - # Consume the entire state machine. - list(workflow) - # The user is not currently subscribed to the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - self.assertIsNone(workflow.member) - # The token is owned by the moderator. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.moderator) - # Create a new workflow with the previous workflow's save token, and - # restore its state. This models an approved subscription and should - # result in the user getting subscribed. - approved_workflow = SubscriptionWorkflow(self._mlist) - approved_workflow.token = workflow.token - approved_workflow.restore() - list(approved_workflow) - # Now the user is subscribed to the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertEqual(member.address, anne) - self.assertEqual(approved_workflow.member, member) - # No further token is needed. - self.assertIsNone(approved_workflow.token) - self.assertEqual(approved_workflow.token_owner, TokenOwner.no_one) - - def test_get_moderator_approval_log_on_hold(self): - # When the subscription is held for moderator approval, a message is - # logged. - mark = LogFileMark('mailman.subscribe') - self._mlist.subscription_policy = SubscriptionPolicy.moderate - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, - pre_verified=True, - pre_confirmed=True) - # Consume the entire state machine. - list(workflow) - self.assertIn( - 'test@example.com: held subscription request from anne@example.com', - mark.readline() - ) - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_get_moderator_approval_notifies_moderators(self): - # When the subscription is held for moderator approval, and the list - # is so configured, a notification is sent to the list moderators. - self._mlist.admin_immed_notify = True - self._mlist.subscription_policy = SubscriptionPolicy.moderate - anne = self._user_manager.create_address(self._anne) - bart = self._user_manager.create_user('bart@example.com', 'Bart User') - address = set_preferred(bart) - self._mlist.subscribe(address, MemberRole.moderator) - workflow = SubscriptionWorkflow(self._mlist, anne, - pre_verified=True, - pre_confirmed=True) - # Consume the entire state machine. - list(workflow) - # Find the moderator message. - items = get_queue_messages('virgin', expected_count=1) - for item in items: - if item.msg['to'] == 'test-owner@example.com': - break - else: - raise AssertionError('No moderator email found') - self.assertEqual( - item.msgdata['recipients'], {'test-owner@example.com'}) - message = items[0].msg - self.assertEqual(message['From'], 'test-owner@example.com') - self.assertEqual(message['To'], 'test-owner@example.com') - self.assertEqual( - message['Subject'], - 'New subscription request to Test from anne@example.com') - self.assertEqual(message.get_payload(), """\ -Your authorization is required for a mailing list subscription request -approval: - - For: anne@example.com - List: test@example.com -""") - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_get_moderator_approval_no_notifications(self): - # When the subscription is held for moderator approval, and the list - # is so configured, a notification is sent to the list moderators. - self._mlist.admin_immed_notify = False - self._mlist.subscription_policy = SubscriptionPolicy.moderate - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, - pre_verified=True, - pre_confirmed=True) - # Consume the entire state machine. - list(workflow) - get_queue_messages('virgin', expected_count=0) - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_send_confirmation(self): - # A confirmation message gets sent when the address is not verified. - anne = self._user_manager.create_address(self._anne) - self.assertIsNone(anne.verified_on) - # Run the workflow to model the confirmation step. - workflow = SubscriptionWorkflow(self._mlist, anne) - list(workflow) - items = get_queue_messages('virgin', expected_count=1) - message = items[0].msg - token = workflow.token - self.assertEqual(message['Subject'], 'confirm {}'.format(token)) - self.assertEqual( - message['From'], 'test-confirm+{}@example.com'.format(token)) - # The confirmation message is not `Precedence: bulk`. - self.assertIsNone(message['precedence']) - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_send_confirmation_pre_confirmed(self): - # A confirmation message gets sent when the address is not verified - # but the subscription is pre-confirmed. - anne = self._user_manager.create_address(self._anne) - self.assertIsNone(anne.verified_on) - # Run the workflow to model the confirmation step. - workflow = SubscriptionWorkflow(self._mlist, anne, pre_confirmed=True) - list(workflow) - items = get_queue_messages('virgin', expected_count=1) - message = items[0].msg - token = workflow.token - self.assertEqual( - message['Subject'], 'confirm {}'.format(workflow.token)) - self.assertEqual( - message['From'], 'test-confirm+{}@example.com'.format(token)) - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_send_confirmation_pre_verified(self): - # A confirmation message gets sent even when the address is verified - # when the subscription must be confirmed. - self._mlist.subscription_policy = SubscriptionPolicy.confirm - anne = self._user_manager.create_address(self._anne) - self.assertIsNone(anne.verified_on) - # Run the workflow to model the confirmation step. - workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True) - list(workflow) - items = get_queue_messages('virgin', expected_count=1) - message = items[0].msg - token = workflow.token - self.assertEqual( - message['Subject'], 'confirm {}'.format(workflow.token)) - self.assertEqual( - message['From'], 'test-confirm+{}@example.com'.format(token)) - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_do_confirm_verify_address(self): - # The address is not yet verified, nor are we pre-verifying. A - # confirmation message will be sent. When the user confirms their - # subscription request, the address will end up being verified. - anne = self._user_manager.create_address(self._anne) - self.assertIsNone(anne.verified_on) - # Run the workflow to model the confirmation step. - workflow = SubscriptionWorkflow(self._mlist, anne) - list(workflow) - # The address is still not verified. - self.assertIsNone(anne.verified_on) - confirm_workflow = SubscriptionWorkflow(self._mlist) - confirm_workflow.token = workflow.token - confirm_workflow.restore() - confirm_workflow.run_thru('do_confirm_verify') - # The address is now verified. - self.assertIsNotNone(anne.verified_on) - - def test_do_confirm_verify_user(self): - # A confirmation step is necessary when a user subscribes with their - # preferred address, and we are not pre-confirming. - anne = self._user_manager.create_user(self._anne) - set_preferred(anne) - # Run the workflow to model the confirmation step. There is no - # subscriber attribute yet. - workflow = SubscriptionWorkflow(self._mlist, anne) - list(workflow) - self.assertEqual(workflow.subscriber, anne) - # Do a confirmation workflow, which should now set the subscriber. - confirm_workflow = SubscriptionWorkflow(self._mlist) - confirm_workflow.token = workflow.token - confirm_workflow.restore() - confirm_workflow.run_thru('do_confirm_verify') - # The address is now verified. - self.assertEqual(confirm_workflow.subscriber, anne) - - def test_do_confirmation_subscribes_user(self): - # Subscriptions to the mailing list must be confirmed. Once that's - # done, the user's address (which is not initially verified) gets - # subscribed to the mailing list. - self._mlist.subscription_policy = SubscriptionPolicy.confirm - anne = self._user_manager.create_address(self._anne) - self.assertIsNone(anne.verified_on) - workflow = SubscriptionWorkflow(self._mlist, anne) - list(workflow) - # Anne is not yet a member. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - self.assertIsNone(workflow.member) - # The token is owned by the subscriber. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.subscriber) - # Confirm. - confirm_workflow = SubscriptionWorkflow(self._mlist) - confirm_workflow.token = workflow.token - confirm_workflow.restore() - list(confirm_workflow) - self.assertIsNotNone(anne.verified_on) - # Anne is now a member. - member = self._mlist.regular_members.get_member(self._anne) - self.assertEqual(member.address, anne) - self.assertEqual(confirm_workflow.member, member) - # No further token is needed. - self.assertIsNone(confirm_workflow.token) - self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one) - - def test_prevent_confirmation_replay_attacks(self): - # Ensure that if the workflow requires two confirmations, e.g. first - # the user confirming their subscription, and then the moderator - # approving it, that different tokens are used in these two cases. - self._mlist.subscription_policy = ( - SubscriptionPolicy.confirm_then_moderate) - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True) - # Run the state machine up to the first confirmation, and cache the - # confirmation token. - list(workflow) - token = workflow.token - # Anne is not yet a member of the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - self.assertIsNone(workflow.member) - # The token is owned by the subscriber. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.subscriber) - # The old token will not work for moderator approval. - moderator_workflow = SubscriptionWorkflow(self._mlist) - moderator_workflow.token = token - moderator_workflow.restore() - list(moderator_workflow) - # The token is owned by the moderator. - self.assertIsNotNone(moderator_workflow.token) - self.assertEqual(moderator_workflow.token_owner, TokenOwner.moderator) - # While we wait for the moderator to approve the subscription, note - # that there's a new token for the next steps. - self.assertNotEqual(token, moderator_workflow.token) - # The old token won't work. - final_workflow = SubscriptionWorkflow(self._mlist) - final_workflow.token = token - self.assertRaises(LookupError, final_workflow.restore) - # Running this workflow will fail. - self.assertRaises(AssertionError, list, final_workflow) - # Anne is still not subscribed. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - self.assertIsNone(final_workflow.member) - # However, if we use the new token, her subscription request will be - # approved by the moderator. - final_workflow.token = moderator_workflow.token - final_workflow.restore() - list(final_workflow) - # And now Anne is a member. - member = self._mlist.regular_members.get_member(self._anne) - self.assertEqual(member.address.email, self._anne) - self.assertEqual(final_workflow.member, member) - # No further token is needed. - self.assertIsNone(final_workflow.token) - self.assertEqual(final_workflow.token_owner, TokenOwner.no_one) - - def test_confirmation_needed_and_pre_confirmed(self): - # The subscription policy is 'confirm' but the subscription is - # pre-confirmed so the moderation checks can be skipped. - self._mlist.subscription_policy = SubscriptionPolicy.confirm - anne = self._user_manager.create_address(self._anne) - workflow = SubscriptionWorkflow( - self._mlist, anne, - pre_verified=True, pre_confirmed=True, pre_approved=True) - list(workflow) - # Anne was subscribed. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - self.assertEqual(workflow.member.address, anne) - - def test_restore_user_absorbed(self): - # The subscribing user is absorbed (and thus deleted) before the - # moderator approves the subscription. - self._mlist.subscription_policy = SubscriptionPolicy.moderate - anne = self._user_manager.create_user(self._anne) - bill = self._user_manager.create_user('bill@example.com') - set_preferred(bill) - # anne subscribes. - workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True) - list(workflow) - # bill absorbs anne. - bill.absorb(anne) - # anne's subscription request is approved. - approved_workflow = SubscriptionWorkflow(self._mlist) - approved_workflow.token = workflow.token - approved_workflow.restore() - self.assertEqual(approved_workflow.user, bill) - # Run the workflow through. - list(approved_workflow) - - def test_restore_address_absorbed(self): - # The subscribing user is absorbed (and thus deleted) before the - # moderator approves the subscription. - self._mlist.subscription_policy = SubscriptionPolicy.moderate - anne = self._user_manager.create_user(self._anne) - anne_address = anne.addresses[0] - bill = self._user_manager.create_user('bill@example.com') - # anne subscribes. - workflow = SubscriptionWorkflow( - self._mlist, anne_address, pre_verified=True) - list(workflow) - # bill absorbs anne. - bill.absorb(anne) - self.assertIn(anne_address, bill.addresses) - # anne's subscription request is approved. - approved_workflow = SubscriptionWorkflow(self._mlist) - approved_workflow.token = workflow.token - approved_workflow.restore() - self.assertEqual(approved_workflow.user, bill) - # Run the workflow through. - list(approved_workflow) diff --git a/src/mailman/app/tests/test_unsubscriptions.py b/src/mailman/app/tests/test_unsubscriptions.py deleted file mode 100644 index 4ca657190..000000000 --- a/src/mailman/app/tests/test_unsubscriptions.py +++ /dev/null @@ -1,522 +0,0 @@ -# Copyright (C) 2016-2017 by the Free Software Foundation, Inc. -# -# This file is part of GNU Mailman. -# -# GNU Mailman is free software: you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free -# Software Foundation, either version 3 of the License, or (at your option) -# any later version. -# -# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -# more details. -# -# You should have received a copy of the GNU General Public License along with -# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. - -"""Test for unsubscription service.""" - -import unittest - -from contextlib import suppress -from mailman.app.lifecycle import create_list -from mailman.app.subscriptions import UnSubscriptionWorkflow -from mailman.interfaces.mailinglist import SubscriptionPolicy -from mailman.interfaces.pending import IPendings -from mailman.interfaces.subscriptions import TokenOwner -from mailman.interfaces.usermanager import IUserManager -from mailman.testing.helpers import LogFileMark, get_queue_messages -from mailman.testing.layers import ConfigLayer -from mailman.utilities.datetime import now -from unittest.mock import patch -from zope.component import getUtility - - -class TestUnSubscriptionWorkflow(unittest.TestCase): - layer = ConfigLayer - maxDiff = None - - def setUp(self): - self._mlist = create_list('test@example.com') - self._mlist.admin_immed_notify = False - self._mlist.unsubscription_policy = SubscriptionPolicy.open - self._mlist.send_welcome_message = False - self._anne = 'anne@example.com' - self._user_manager = getUtility(IUserManager) - self.anne = self._user_manager.create_user(self._anne) - self.anne.addresses[0].verified_on = now() - self.anne.preferred_address = self.anne.addresses[0] - self._mlist.subscribe(self.anne) - self._expected_pendings_count = 0 - - def tearDown(self): - # There usually should be no pending after all is said and done, but - # some tests don't complete the workflow. - self.assertEqual(getUtility(IPendings).count, - self._expected_pendings_count) - - def test_start_state(self): - # Test the workflow starts with no tokens or members. - workflow = UnSubscriptionWorkflow(self._mlist) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - self.assertIsNone(workflow.token) - self.assertIsNone(workflow.member) - - def test_pended_data(self): - # Test there is a Pendable object associated with a held - # unsubscription request and it has some valid data associated with - # it. - self._mlist.unsubscription_policy = SubscriptionPolicy.confirm - workflow = UnSubscriptionWorkflow(self._mlist, self.anne) - with suppress(StopIteration): - workflow.run_thru('send_confirmation') - self.assertIsNotNone(workflow.token) - pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) - self.assertEqual(pendable['list_id'], 'test.example.com') - self.assertEqual(pendable['email'], 'anne@example.com') - self.assertEqual(pendable['display_name'], '') - self.assertEqual(pendable['when'], '2005-08-01T07:49:23') - self.assertEqual(pendable['token_owner'], 'subscriber') - # The token is still in the database. - self._expected_pendings_count = 1 - - def test_user_or_address_required(self): - # The `subscriber` attribute must be a user or address that is provided - # to the workflow. - workflow = UnSubscriptionWorkflow(self._mlist) - self.assertRaises(AssertionError, list, workflow) - - def test_user_is_subscribed_to_unsubscribe(self): - # A user must be subscribed to a list when trying to unsubscribe. - addr = self._user_manager.create_address('aperson@example.org') - addr.verfied_on = now() - workflow = UnSubscriptionWorkflow(self._mlist, addr) - self.assertRaises(AssertionError, - workflow.run_thru, 'subscription_checks') - - def test_confirmation_checks_open_list(self): - # An unsubscription from an open list does not need to be confirmed or - # moderated. - self._mlist.unsubscription_policy = SubscriptionPolicy.open - workflow = UnSubscriptionWorkflow(self._mlist, self.anne) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_do_unsubscription') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_no_user_confirmation_needed(self): - # An unsubscription from a list which does not need user confirmation - # skips to the moderation checks. - self._mlist.unsubscription_policy = SubscriptionPolicy.moderate - workflow = UnSubscriptionWorkflow(self._mlist, self.anne, - pre_confirmed=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_moderation_checks') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_confirm_pre_confirmed(self): - # The unsubscription policy requires user-confirmation, but their - # unsubscription is pre-confirmed. Since moderation is not reuqired, - # the user will be immediately unsubscribed. - self._mlist.unsubscription_policy = SubscriptionPolicy.confirm - workflow = UnSubscriptionWorkflow( - self._mlist, self.anne, pre_confirmed=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_do_unsubscription') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_confirm_then_moderate_pre_confirmed(self): - # The unsubscription policy requires user confirmation, but their - # unsubscription is pre-confirmed. Since moderation is required, that - # check will be performed. - self._mlist.unsubscription_policy = ( - SubscriptionPolicy.confirm_then_moderate) - workflow = UnSubscriptionWorkflow( - self._mlist, self.anne, pre_confirmed=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_do_unsubscription') as step: - next(workflow) - step.assert_called_once_with() - - def test_send_confirmation_checks_confirm_list(self): - # The unsubscription policy requires user confirmation and the - # unsubscription is not pre-confirmed. - self._mlist.unsubscription_policy = SubscriptionPolicy.confirm - workflow = UnSubscriptionWorkflow(self._mlist, self.anne) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_send_confirmation') as step: - next(workflow) - step.assert_called_once_with() - - def test_moderation_checks_moderated_list(self): - # The unsubscription policy requires moderation. - self._mlist.unsubscription_policy = SubscriptionPolicy.moderate - workflow = UnSubscriptionWorkflow(self._mlist, self.anne) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_moderation_checks') as step: - next(workflow) - step.assert_called_once_with() - - def test_moderation_checks_approval_required(self): - # The moderator must approve the subscription request. - self._mlist.unsubscription_policy = SubscriptionPolicy.moderate - workflow = UnSubscriptionWorkflow(self._mlist, self.anne) - workflow.run_thru('moderation_checks') - with patch.object(workflow, '_step_get_moderator_approval') as step: - next(workflow) - step.assert_called_once_with() - - def test_do_unsusbcription(self): - # An open unsubscription policy means the user gets unsubscribed to - # the mailing list without any further confirmations or approvals. - self._mlist.unsubscription_policy = SubscriptionPolicy.open - workflow = UnSubscriptionWorkflow(self._mlist, self.anne) - list(workflow) - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - - def test_do_unsubscription_pre_approved(self): - # A moderation-requiring subscription policy plus a pre-approved - # address means the user gets unsubscribed from the mailing list - # without any further confirmation or approvals. - self._mlist.unsubscription_policy = SubscriptionPolicy.moderate - workflow = UnSubscriptionWorkflow(self._mlist, self.anne, - pre_approved=True) - list(workflow) - # Anne is now unsubscribed form the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - # No further token is needed. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - - def test_do_unsubscription_pre_approved_pre_confirmed(self): - # A moderation-requiring unsubscription policy plus a pre-appvoed - # address means the user gets unsubscribed to the mailing list without - # any further confirmations or approvals. - self._mlist.unsubscription_policy = ( - SubscriptionPolicy.confirm_then_moderate) - workflow = UnSubscriptionWorkflow(self._mlist, self.anne, - pre_approved=True, - pre_confirmed=True) - list(workflow) - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - # No further token is needed. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - - def test_do_unsubscription_cleanups(self): - # Once the user is unsubscribed, the token and its associated pending - # database record will be removed from the database. - self._mlist.unsubscription_policy = SubscriptionPolicy.open - workflow = UnSubscriptionWorkflow(self._mlist, self.anne, - pre_approved=True, - pre_confirmed=True) - # Run the workflow. - list(workflow) - # Anne is now unsubscribed from the list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - # Workflow is done, so it has no token. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - - def test_moderator_approves(self): - # The workflow runs until moderator approval is required, at which - # point the workflow is saved. Once the moderator approves, the - # workflow resumes and the user is unsubscribed. - self._mlist.unsubscription_policy = SubscriptionPolicy.moderate - workflow = UnSubscriptionWorkflow( - self._mlist, self.anne, pre_confirmed=True) - # Run the entire workflow. - list(workflow) - # The user is currently subscribed to the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNotNone(member) - self.assertIsNotNone(workflow.member) - # The token is owned by the moderator. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.moderator) - # Create a new workflow with the previous workflow's save token, and - # restore its state. This models an approved un-sunscription request - # and should result in the user getting subscribed. - approved_workflow = UnSubscriptionWorkflow(self._mlist) - approved_workflow.token = workflow.token - approved_workflow.restore() - list(approved_workflow) - # Now the user is unsubscribed from the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - self.assertEqual(approved_workflow.member, member) - # No further token is needed. - self.assertIsNone(approved_workflow.token) - self.assertEqual(approved_workflow.token_owner, TokenOwner.no_one) - - def test_get_moderator_approval_log_on_hold(self): - # When the unsubscription is held for moderator approval, a message is - # logged. - mark = LogFileMark('mailman.subscribe') - self._mlist.unsubscription_policy = SubscriptionPolicy.moderate - workflow = UnSubscriptionWorkflow( - self._mlist, self.anne, pre_confirmed=True) - # Run the entire workflow. - list(workflow) - self.assertIn( - 'test@example.com: held unsubscription request from anne@example.com', - mark.readline() - ) - # The state machine stopped at the moderator approval step so there - # will be one token still in the database. - self._expected_pendings_count = 1 - - def test_get_moderator_approval_notifies_moderators(self): - # When the unsubscription is held for moderator approval, and the list - # is so configured, a notification is sent to the list moderators. - self._mlist.admin_immed_notify = True - self._mlist.unsubscription_policy = SubscriptionPolicy.moderate - workflow = UnSubscriptionWorkflow( - self._mlist, self.anne, pre_confirmed=True) - # Consume the entire state machine. - list(workflow) - items = get_queue_messages('virgin', expected_count=1) - message = items[0].msg - self.assertEqual(message['From'], 'test-owner@example.com') - self.assertEqual(message['To'], 'test-owner@example.com') - self.assertEqual( - message['Subject'], - 'New unsubscription request to Test from anne@example.com') - self.assertEqual(message.get_payload(), """\ -Your authorization is required for a mailing list unsubscription -request approval: - - For: anne@example.com - List: test@example.com -""") - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_get_moderator_approval_no_notifications(self): - # When the unsubscription request is held for moderator approval, and - # the list is so configured, a notification is sent to the list - # moderators. - self._mlist.admin_immed_notify = False - self._mlist.unsubscription_policy = SubscriptionPolicy.moderate - workflow = UnSubscriptionWorkflow( - self._mlist, self.anne, pre_confirmed=True) - # Consume the entire state machine. - list(workflow) - get_queue_messages('virgin', expected_count=0) - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_send_confirmation(self): - # A confirmation message gets sent when the unsubscription must be - # confirmed. - self._mlist.unsubscription_policy = SubscriptionPolicy.confirm - # Run the workflow to model the confirmation step. - workflow = UnSubscriptionWorkflow(self._mlist, self.anne) - list(workflow) - items = get_queue_messages('virgin', expected_count=1) - message = items[0].msg - token = workflow.token - self.assertEqual( - message['Subject'], 'confirm {}'.format(workflow.token)) - self.assertEqual( - message['From'], 'test-confirm+{}@example.com'.format(token)) - # The state machine stopped at the member confirmation step so there - # will be one token still in the database. - self._expected_pendings_count = 1 - - def test_do_confirmation_unsubscribes_user(self): - # Unsubscriptions to the mailing list must be confirmed. Once that's - # done, the user's address is unsubscribed. - self._mlist.unsubscription_policy = SubscriptionPolicy.confirm - workflow = UnSubscriptionWorkflow(self._mlist, self.anne) - list(workflow) - # Anne is a member. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNotNone(member) - self.assertEqual(member, workflow.member) - # The token is owned by the subscriber. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.subscriber) - # Confirm. - confirm_workflow = UnSubscriptionWorkflow(self._mlist) - confirm_workflow.token = workflow.token - confirm_workflow.restore() - list(confirm_workflow) - # Anne is now unsubscribed. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - # No further token is needed. - self.assertIsNone(confirm_workflow.token) - self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one) - - def test_do_confirmation_unsubscribes_address(self): - # Unsubscriptions to the mailing list must be confirmed. Once that's - # done, the address is unsubscribed. - address = self.anne.register('anne.person@example.com') - self._mlist.subscribe(address) - self._mlist.unsubscription_policy = SubscriptionPolicy.confirm - workflow = UnSubscriptionWorkflow(self._mlist, address) - list(workflow) - # Bart is a member. - member = self._mlist.regular_members.get_member( - 'anne.person@example.com') - self.assertIsNotNone(member) - self.assertEqual(member, workflow.member) - # The token is owned by the subscriber. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.subscriber) - # Confirm. - confirm_workflow = UnSubscriptionWorkflow(self._mlist) - confirm_workflow.token = workflow.token - confirm_workflow.restore() - list(confirm_workflow) - # Bart is now unsubscribed. - member = self._mlist.regular_members.get_member( - 'anne.person@example.com') - self.assertIsNone(member) - # No further token is needed. - self.assertIsNone(confirm_workflow.token) - self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one) - - def test_do_confirmation_nonmember(self): - # Attempt to confirm the unsubscription of a member who has already - # been unsubscribed. - self._mlist.unsubscription_policy = SubscriptionPolicy.confirm - workflow = UnSubscriptionWorkflow(self._mlist, self.anne) - list(workflow) - # Anne is a member. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNotNone(member) - self.assertEqual(member, workflow.member) - # The token is owned by the subscriber. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.subscriber) - # Unsubscribe Anne out of band. - member.unsubscribe() - # Confirm. - confirm_workflow = UnSubscriptionWorkflow(self._mlist) - confirm_workflow.token = workflow.token - confirm_workflow.restore() - list(confirm_workflow) - # No further token is needed. - self.assertIsNone(confirm_workflow.token) - self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one) - - def test_do_confirmation_nonmember_final_step(self): - # Attempt to confirm the unsubscription of a member who has already - # been unsubscribed. - self._mlist.unsubscription_policy = SubscriptionPolicy.confirm - workflow = UnSubscriptionWorkflow(self._mlist, self.anne) - list(workflow) - # Anne is a member. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNotNone(member) - self.assertEqual(member, workflow.member) - # The token is owned by the subscriber. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.subscriber) - # Confirm. - confirm_workflow = UnSubscriptionWorkflow(self._mlist) - confirm_workflow.token = workflow.token - confirm_workflow.restore() - confirm_workflow.run_until('do_unsubscription') - self.assertEqual(member, confirm_workflow.member) - # Unsubscribe Anne out of band. - member.unsubscribe() - list(confirm_workflow) - self.assertIsNone(confirm_workflow.member) - # No further token is needed. - self.assertIsNone(confirm_workflow.token) - self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one) - - def test_prevent_confirmation_replay_attacks(self): - # Ensure that if the workflow requires two confirmations, e.g. first - # the user confirming their subscription, and then the moderator - # approving it, that different tokens are used in these two cases. - self._mlist.unsubscription_policy = ( - SubscriptionPolicy.confirm_then_moderate) - workflow = UnSubscriptionWorkflow(self._mlist, self.anne) - # Run the state machine up to the first confirmation, and cache the - # confirmation token. - list(workflow) - token = workflow.token - # Anne is still a member of the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNotNone(member) - self.assertIsNotNone(workflow.member) - # The token is owned by the subscriber. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.subscriber) - # The old token will not work for moderator approval. - moderator_workflow = UnSubscriptionWorkflow(self._mlist) - moderator_workflow.token = token - moderator_workflow.restore() - list(moderator_workflow) - # The token is owned by the moderator. - self.assertIsNotNone(moderator_workflow.token) - self.assertEqual(moderator_workflow.token_owner, TokenOwner.moderator) - # While we wait for the moderator to approve the subscription, note - # that there's a new token for the next steps. - self.assertNotEqual(token, moderator_workflow.token) - # The old token won't work. - final_workflow = UnSubscriptionWorkflow(self._mlist) - final_workflow.token = token - self.assertRaises(LookupError, final_workflow.restore) - # Running this workflow will fail. - self.assertRaises(AssertionError, list, final_workflow) - # Anne is still not unsubscribed. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNotNone(member) - self.assertIsNone(final_workflow.member) - # However, if we use the new token, her unsubscription request will be - # approved by the moderator. - final_workflow.token = moderator_workflow.token - final_workflow.restore() - list(final_workflow) - # And now Anne is unsubscribed. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - # No further token is needed. - self.assertIsNone(final_workflow.token) - self.assertEqual(final_workflow.token_owner, TokenOwner.no_one) - - def test_confirmation_needed_and_pre_confirmed(self): - # The subscription policy is 'confirm' but the subscription is - # pre-confirmed so the moderation checks can be skipped. - self._mlist.unsubscription_policy = SubscriptionPolicy.confirm - workflow = UnSubscriptionWorkflow( - self._mlist, self.anne, pre_confirmed=True, pre_approved=True) - list(workflow) - # Anne was unsubscribed. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - self.assertIsNone(workflow.member) - - def test_confirmation_needed_moderator_address(self): - address = self.anne.register('anne.person@example.com') - self._mlist.subscribe(address) - self._mlist.unsubscription_policy = SubscriptionPolicy.moderate - workflow = UnSubscriptionWorkflow(self._mlist, address) - # Get moderator approval. - list(workflow) - approved_workflow = UnSubscriptionWorkflow(self._mlist) - approved_workflow.token = workflow.token - approved_workflow.restore() - list(approved_workflow) - self.assertEqual(approved_workflow.subscriber, address) - # Anne was unsubscribed. - self.assertIsNone(approved_workflow.token) - self.assertEqual(approved_workflow.token_owner, TokenOwner.no_one) - self.assertIsNone(approved_workflow.member) - member = self._mlist.regular_members.get_member( - 'anne.person@example.com') - self.assertIsNone(member) diff --git a/src/mailman/app/tests/test_workflow.py b/src/mailman/app/tests/test_workflow.py deleted file mode 100644 index e82001540..000000000 --- a/src/mailman/app/tests/test_workflow.py +++ /dev/null @@ -1,183 +0,0 @@ -# Copyright (C) 2015-2017 by the Free Software Foundation, Inc. -# -# This file is part of GNU Mailman. -# -# GNU Mailman is free software: you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free -# Software Foundation, either version 3 of the License, or (at your option) -# any later version. -# -# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -# more details. -# -# You should have received a copy of the GNU General Public License along with -# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. - -"""App-level workflow tests.""" - -import json -import unittest - -from mailman.app.workflow import Workflow -from mailman.interfaces.workflow import IWorkflowStateManager -from mailman.testing.layers import ConfigLayer -from zope.component import getUtility - - -class MyWorkflow(Workflow): - INITIAL_STATE = 'first' - SAVE_ATTRIBUTES = ('ant', 'bee', 'cat') - - def __init__(self): - super().__init__() - self.token = 'test-workflow' - self.ant = 1 - self.bee = 2 - self.cat = 3 - self.dog = 4 - - def _step_first(self): - self.push('second') - return 'one' - - def _step_second(self): - self.push('third') - return 'two' - - def _step_third(self): - return 'three' - - -class DependentWorkflow(MyWorkflow): - SAVE_ATTRIBUTES = ('ant', 'bee', 'cat', 'elf') - - def __init__(self): - super().__init__() - self._elf = 5 - - @property - def elf(self): - return self._elf - - @elf.setter - def elf(self, value): - # This attribute depends on other attributes. - assert self.ant is not None - assert self.bee is not None - assert self.cat is not None - self._elf = value - - -class TestWorkflow(unittest.TestCase): - layer = ConfigLayer - - def setUp(self): - self._workflow = iter(MyWorkflow()) - - def test_basic_workflow(self): - # The work flows from one state to the next. - results = list(self._workflow) - self.assertEqual(results, ['one', 'two', 'three']) - - def test_partial_workflow(self): - # You don't have to flow through every step. - results = next(self._workflow) - self.assertEqual(results, 'one') - - def test_exhaust_workflow(self): - # Manually flow through a few steps, then consume the whole thing. - results = [next(self._workflow)] - results.extend(self._workflow) - self.assertEqual(results, ['one', 'two', 'three']) - - def test_save_and_restore_workflow(self): - # Without running any steps, save and restore the workflow. Then - # consume the restored workflow. - self._workflow.save() - new_workflow = MyWorkflow() - new_workflow.restore() - results = list(new_workflow) - self.assertEqual(results, ['one', 'two', 'three']) - - def test_save_and_restore_partial_workflow(self): - # After running a few steps, save and restore the workflow. Then - # consume the restored workflow. - next(self._workflow) - self._workflow.save() - new_workflow = MyWorkflow() - new_workflow.restore() - results = list(new_workflow) - self.assertEqual(results, ['two', 'three']) - - def test_save_and_restore_exhausted_workflow(self): - # After consuming the entire workflow, save and restore it. - list(self._workflow) - self._workflow.save() - new_workflow = MyWorkflow() - new_workflow.restore() - results = list(new_workflow) - self.assertEqual(len(results), 0) - - def test_save_and_restore_attributes(self): - # Saved attributes are restored. - self._workflow.ant = 9 - self._workflow.bee = 8 - self._workflow.cat = 7 - # Don't save .dog. - self._workflow.save() - new_workflow = MyWorkflow() - new_workflow.restore() - self.assertEqual(new_workflow.ant, 9) - self.assertEqual(new_workflow.bee, 8) - self.assertEqual(new_workflow.cat, 7) - self.assertEqual(new_workflow.dog, 4) - - def test_save_and_restore_dependant_attributes(self): - # Attributes must be restored in the order they are declared in - # SAVE_ATTRIBUTES. - workflow = iter(DependentWorkflow()) - workflow.elf = 6 - workflow.save() - new_workflow = DependentWorkflow() - # The elf attribute must be restored last, set triggering values for - # attributes it depends on. - new_workflow.ant = new_workflow.bee = new_workflow.cat = None - new_workflow.restore() - self.assertEqual(new_workflow.elf, 6) - - def test_save_and_restore_obsolete_attributes(self): - # Obsolete saved attributes are ignored. - state_manager = getUtility(IWorkflowStateManager) - # Save the state of an old version of the workflow that would not have - # the cat attribute. - state_manager.save( - self._workflow.token, 'first', - json.dumps({'ant': 1, 'bee': 2})) - # Restore in the current version that needs the cat attribute. - new_workflow = MyWorkflow() - try: - new_workflow.restore() - except KeyError: - self.fail('Restore does not handle obsolete attributes') - # Restoring must not raise an exception, the default value is kept. - self.assertEqual(new_workflow.cat, 3) - - def test_run_thru(self): - # Run all steps through the given one. - results = self._workflow.run_thru('second') - self.assertEqual(results, ['one', 'two']) - - def test_run_thru_completes(self): - results = self._workflow.run_thru('all of them') - self.assertEqual(results, ['one', 'two', 'three']) - - def test_run_until(self): - # Run until (but not including) the given step. - results = self._workflow.run_until('second') - self.assertEqual(results, ['one']) - - def test_run_until_completes(self): - results = self._workflow.run_until('all of them') - self.assertEqual(results, ['one', 'two', 'three']) diff --git a/src/mailman/app/tests/test_workflowmanager.py b/src/mailman/app/tests/test_workflowmanager.py index 38ed2e468..e21e428f6 100644 --- a/src/mailman/app/tests/test_workflowmanager.py +++ b/src/mailman/app/tests/test_workflowmanager.py @@ -20,7 +20,6 @@ import unittest from mailman.app.lifecycle import create_list -from mailman.interfaces.mailinglist import SubscriptionPolicy from mailman.interfaces.member import MemberRole from mailman.interfaces.pending import IPendings from mailman.interfaces.subscriptions import ISubscriptionManager, TokenOwner @@ -28,6 +27,9 @@ from mailman.interfaces.usermanager import IUserManager from mailman.testing.helpers import get_queue_messages from mailman.testing.layers import ConfigLayer from mailman.utilities.datetime import now +from mailman.workflows.subscription import ( + ConfirmModerationSubscriptionPolicy, ConfirmSubscriptionPolicy, + ModerationSubscriptionPolicy, OpenSubscriptionPolicy) from zope.component import getUtility @@ -60,7 +62,7 @@ class TestRegistrar(unittest.TestCase): # Registering a subscription request where no confirmation or # moderation steps are needed, leaves us with no token or owner, since # there's nothing more to do. - self._mlist.subscription_policy = SubscriptionPolicy.open + self._mlist.subscription_policy = OpenSubscriptionPolicy self._anne.verified_on = now() token, token_owner, rmember = self._registrar.register(self._anne) self.assertIsNone(token) @@ -82,7 +84,7 @@ class TestRegistrar(unittest.TestCase): # (because she does not have a verified address), but not the moderator # to approve. Running the workflow gives us a token. Confirming the # token subscribes the user. - self._mlist.subscription_policy = SubscriptionPolicy.open + self._mlist.subscription_policy = OpenSubscriptionPolicy token, token_owner, rmember = self._registrar.register(self._anne) self.assertIsNotNone(token) self.assertEqual(token_owner, TokenOwner.subscriber) @@ -102,7 +104,7 @@ class TestRegistrar(unittest.TestCase): # (because of list policy), but not the moderator to approve. Running # the workflow gives us a token. Confirming the token subscribes the # user. - self._mlist.subscription_policy = SubscriptionPolicy.confirm + self._mlist.subscription_policy = ConfirmSubscriptionPolicy self._anne.verified_on = now() token, token_owner, rmember = self._registrar.register(self._anne) self.assertIsNotNone(token) @@ -122,7 +124,7 @@ class TestRegistrar(unittest.TestCase): # We have a subscription request which requires the moderator to # approve. Running the workflow gives us a token. Confirming the # token subscribes the user. - self._mlist.subscription_policy = SubscriptionPolicy.moderate + self._mlist.subscription_policy = ModerationSubscriptionPolicy self._anne.verified_on = now() token, token_owner, rmember = self._registrar.register(self._anne) self.assertIsNotNone(token) @@ -145,7 +147,7 @@ class TestRegistrar(unittest.TestCase): # token runs the workflow a little farther, but still gives us a # token. Confirming again subscribes the user. self._mlist.subscription_policy = ( - SubscriptionPolicy.confirm_then_moderate) + ConfirmModerationSubscriptionPolicy) self._anne.verified_on = now() # Runs until subscription confirmation. token, token_owner, rmember = self._registrar.register(self._anne) @@ -180,7 +182,7 @@ class TestRegistrar(unittest.TestCase): # sees when they approve the subscription. This prevents the user # from using a replay attack to subvert moderator approval. self._mlist.subscription_policy = ( - SubscriptionPolicy.confirm_then_moderate) + ConfirmModerationSubscriptionPolicy) self._anne.verified_on = now() # Runs until subscription confirmation. token, token_owner, rmember = self._registrar.register(self._anne) @@ -216,7 +218,7 @@ class TestRegistrar(unittest.TestCase): def test_discard_waiting_for_confirmation(self): # While waiting for a user to confirm their subscription, we discard # the workflow. - self._mlist.subscription_policy = SubscriptionPolicy.confirm + self._mlist.subscription_policy = ConfirmSubscriptionPolicy self._anne.verified_on = now() # Runs until subscription confirmation. token, token_owner, rmember = self._registrar.register(self._anne) @@ -233,7 +235,7 @@ class TestRegistrar(unittest.TestCase): def test_admin_notify_mchanges(self): # When a user gets subscribed via the subscription policy workflow, # the list administrators get an email notification. - self._mlist.subscription_policy = SubscriptionPolicy.open + self._mlist.subscription_policy = OpenSubscriptionPolicy self._mlist.admin_notify_mchanges = True self._mlist.send_welcome_message = False token, token_owner, member = self._registrar.register( @@ -253,7 +255,7 @@ anne@example.com has been successfully subscribed to Ant. # Even when a user gets subscribed via the subscription policy # workflow, the list administrators won't get an email notification if # they don't want one. - self._mlist.subscription_policy = SubscriptionPolicy.open + self._mlist.subscription_policy = OpenSubscriptionPolicy self._mlist.admin_notify_mchanges = False self._mlist.send_welcome_message = False # Bart is an administrator of the mailing list. diff --git a/src/mailman/app/workflow.py b/src/mailman/app/workflow.py deleted file mode 100644 index 92b78c184..000000000 --- a/src/mailman/app/workflow.py +++ /dev/null @@ -1,151 +0,0 @@ -# Copyright (C) 2015-2017 by the Free Software Foundation, Inc. -# -# This file is part of GNU Mailman. -# -# GNU Mailman is free software: you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free -# Software Foundation, either version 3 of the License, or (at your option) -# any later version. -# -# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -# more details. -# -# You should have received a copy of the GNU General Public License along with -# GNU Mailman. If not, see <http://www.gnu.org/licenses/>. - -"""Generic workflow.""" - -import sys -import json -import logging - -from collections import deque -from mailman.interfaces.workflow import IWorkflowStateManager -from public import public -from zope.component import getUtility - - -COMMASPACE = ', ' -log = logging.getLogger('mailman.error') - - -@public -class Workflow: - """Generic workflow.""" - - SAVE_ATTRIBUTES = () - INITIAL_STATE = None - - def __init__(self): - self.token = None - self._next = deque() - self.push(self.INITIAL_STATE) - self.debug = False - self._count = 0 - - @property - def name(self): - return self.__class__.__name__ - - def __iter__(self): - return self - - def push(self, step): - self._next.append(step) - - def _pop(self): - name = self._next.popleft() - step = getattr(self, '_step_{}'.format(name)) - self._count += 1 - if self.debug: # pragma: nocover - print('[{:02d}] -> {}'.format(self._count, name), file=sys.stderr) - return name, step - - def __next__(self): - try: - name, step = self._pop() - return step() - except IndexError: - raise StopIteration - except: - log.exception('deque: {}'.format(COMMASPACE.join(self._next))) - raise - - def run_thru(self, stop_after): - """Run the state machine through and including the given step. - - :param stop_after: Name of method, sans prefix to run the - state machine through. In other words, the state machine runs - until the named method completes. - """ - results = [] - while True: - try: - name, step = self._pop() - except (StopIteration, IndexError): - # We're done. - break - results.append(step()) - if name == stop_after: - break - return results - - def run_until(self, stop_before): - """Trun the state machine until (not including) the given step. - - :param stop_before: Name of method, sans prefix that the - state machine is run until the method is reached. Unlike - `run_thru()` the named method is not run. - """ - results = [] - while True: - try: - name, step = self._pop() - except (StopIteration, IndexError): - # We're done. - break - if name == stop_before: - # Stop executing, but not before we push the last state back - # onto the deque. Otherwise, resuming the state machine would - # skip this step. - self._next.appendleft(name) - break - results.append(step()) - return results - - def save(self): - assert self.token, 'Workflow token must be set' - state_manager = getUtility(IWorkflowStateManager) - data = {attr: getattr(self, attr) for attr in self.SAVE_ATTRIBUTES} - # Note: only the next step is saved, not the whole stack. This is not - # an issue in practice, since there's never more than a single step in - # the queue anyway. If we want to support more than a single step in - # the queue *and* want to support state saving/restoring, change this - # method and the restore() method. - if len(self._next) == 0: - step = None - elif len(self._next) == 1: - step = self._next[0] - else: - raise AssertionError( - "Can't save a workflow state with more than one step " - "in the queue") - state_manager.save(self.token, step, json.dumps(data)) - - def restore(self): - state_manager = getUtility(IWorkflowStateManager) - state = state_manager.restore(self.token) - if state is None: - # The token doesn't exist in the database. - raise LookupError(self.token) - self._next.clear() - if state.step: - self._next.append(state.step) - data = json.loads(state.data) - for attr in self.SAVE_ATTRIBUTES: - try: - setattr(self, attr, data[attr]) - except KeyError: - pass |
