summaryrefslogtreecommitdiff
path: root/src/mailman/app
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman/app')
-rw-r--r--src/mailman/app/docs/moderator.rst10
-rw-r--r--src/mailman/app/events.py2
-rw-r--r--src/mailman/app/subscriptions.py552
-rw-r--r--src/mailman/app/tests/test_moderation.py2
-rw-r--r--src/mailman/app/tests/test_subscriptions.py720
-rw-r--r--src/mailman/app/tests/test_unsubscriptions.py522
-rw-r--r--src/mailman/app/tests/test_workflow.py183
-rw-r--r--src/mailman/app/tests/test_workflowmanager.py22
-rw-r--r--src/mailman/app/workflow.py151
9 files changed, 38 insertions, 2126 deletions
diff --git a/src/mailman/app/docs/moderator.rst b/src/mailman/app/docs/moderator.rst
index ce25a4711..6fcaf0b08 100644
--- a/src/mailman/app/docs/moderator.rst
+++ b/src/mailman/app/docs/moderator.rst
@@ -224,7 +224,7 @@ Fred is a member of the mailing list...
>>> from mailman.interfaces.subscriptions import ISubscriptionManager
>>> registrar = ISubscriptionManager(mlist)
>>> token, token_owner, member = registrar.register(
- ... fred, pre_verified=True, pre_confirmed=True, pre_approved=True)
+ ... fred, pre_verified=True, pre_confirmed=True)
>>> member
<Member: Fred Person <fred@example.com> on ant@example.com
as MemberRole.member>
@@ -301,16 +301,16 @@ Usually, the list administrators want to be notified when there are membership
change requests they need to moderate. These notifications are sent when the
list is configured to send them.
- >>> from mailman.interfaces.mailinglist import SubscriptionPolicy
+ >>> from mailman.workflows.subscription import ModerationSubscriptionPolicy
>>> mlist.admin_immed_notify = True
- >>> mlist.subscription_policy = SubscriptionPolicy.moderate
+ >>> mlist.subscription_policy = ModerationSubscriptionPolicy
Gwen tries to subscribe to the mailing list.
>>> gwen = getUtility(IUserManager).create_address(
... 'gwen@example.com', 'Gwen Person')
>>> token, token_owner, member = registrar.register(
- ... gwen, pre_verified=True, pre_confirmed=True)
+ ... gwen, pre_verified=True)
Her subscription must be approved by the list administrator, so she is not yet
a member of the mailing list.
@@ -414,7 +414,7 @@ message.
>>> herb = getUtility(IUserManager).create_address(
... 'herb@example.com', 'Herb Person')
>>> token, token_owner, member = registrar.register(
- ... herb, pre_verified=True, pre_confirmed=True, pre_approved=True)
+ ... herb, pre_verified=True, pre_approved=True)
>>> messages = get_queue_messages('virgin')
>>> len(messages)
1
diff --git a/src/mailman/app/events.py b/src/mailman/app/events.py
index bf7f2ece8..a3fca30a8 100644
--- a/src/mailman/app/events.py
+++ b/src/mailman/app/events.py
@@ -38,7 +38,5 @@ def initialize():
passwords.handle_ConfigurationUpdatedEvent,
style_manager.handle_ConfigurationUpdatedEvent,
subscriptions.handle_ListDeletingEvent,
- subscriptions.handle_SubscriptionConfirmationNeededEvent,
- subscriptions.handle_UnsubscriptionConfirmationNeededEvent,
switchboard.handle_ConfigurationUpdatedEvent,
])
diff --git a/src/mailman/app/subscriptions.py b/src/mailman/app/subscriptions.py
index 0d8176ffb..ba62e9f52 100644
--- a/src/mailman/app/subscriptions.py
+++ b/src/mailman/app/subscriptions.py
@@ -17,515 +17,36 @@
"""Handle subscriptions."""
-import uuid
-import logging
-
-from datetime import timedelta
-from email.utils import formataddr
-from enum import Enum
-from mailman.app.membership import delete_member
-from mailman.app.workflow import Workflow
-from mailman.core.i18n import _
+from mailman.config import config
from mailman.database.transaction import flush
-from mailman.email.message import UserNotification
-from mailman.interfaces.address import IAddress
-from mailman.interfaces.bans import IBanManager
from mailman.interfaces.listmanager import ListDeletingEvent
-from mailman.interfaces.mailinglist import SubscriptionPolicy
-from mailman.interfaces.member import (
- AlreadySubscribedError, MemberRole, MembershipIsBannedError,
- NotAMemberError)
-from mailman.interfaces.pending import IPendable, IPendings
+from mailman.interfaces.pending import IPendings
from mailman.interfaces.subscriptions import (
- ISubscriptionManager, ISubscriptionService,
- SubscriptionConfirmationNeededEvent, SubscriptionPendingError, TokenOwner,
- UnsubscriptionConfirmationNeededEvent)
-from mailman.interfaces.template import ITemplateLoader
-from mailman.interfaces.user import IUser
-from mailman.interfaces.usermanager import IUserManager
-from mailman.interfaces.workflow import IWorkflowStateManager
-from mailman.utilities.datetime import now
-from mailman.utilities.string import expand, wrap
+ ISubscriptionManager, ISubscriptionService)
+from mailman.interfaces.workflows import IWorkflowStateManager
+from mailman.workflows.common import (PendableSubscription,
+ PendableUnsubscription)
from public import public
from zope.component import getUtility
-from zope.event import notify
from zope.interface import implementer
-log = logging.getLogger('mailman.subscribe')
-
-
-class WhichSubscriber(Enum):
- address = 1
- user = 2
-
-
-@implementer(IPendable)
-class PendableSubscription(dict):
- PEND_TYPE = 'subscription'
-
-
-@implementer(IPendable)
-class PendableUnsubscription(dict):
- PEND_TYPE = 'unsubscription'
-
-
-class _SubscriptionWorkflowCommon(Workflow):
- """Common support between subscription and unsubscription."""
-
- PENDABLE_CLASS = None
-
- def __init__(self, mlist, subscriber):
- super().__init__()
- self.mlist = mlist
- self.address = None
- self.user = None
- self.which = None
- self.member = None
- self._set_token(TokenOwner.no_one)
- # The subscriber must be either an IUser or IAddress.
- if IAddress.providedBy(subscriber):
- self.address = subscriber
- self.user = self.address.user
- self.which = WhichSubscriber.address
- elif IUser.providedBy(subscriber):
- self.address = subscriber.preferred_address
- self.user = subscriber
- self.which = WhichSubscriber.user
- self.subscriber = subscriber
-
- @property
- def user_key(self):
- # For save.
- return self.user.user_id.hex
-
- @user_key.setter
- def user_key(self, hex_key):
- # For restore.
- uid = uuid.UUID(hex_key)
- self.user = getUtility(IUserManager).get_user_by_id(uid)
- if self.user is None:
- self.user = self.address.user
-
- @property
- def address_key(self):
- # For save.
- return self.address.email
-
- @address_key.setter
- def address_key(self, email):
- # For restore.
- self.address = getUtility(IUserManager).get_address(email)
- assert self.address is not None
-
- @property
- def subscriber_key(self):
- return self.which.value
-
- @subscriber_key.setter
- def subscriber_key(self, key):
- self.which = WhichSubscriber(key)
-
- @property
- def token_owner_key(self):
- return self.token_owner.value
-
- @token_owner_key.setter
- def token_owner_key(self, value):
- self.token_owner = TokenOwner(value)
-
- def _set_token(self, token_owner):
- assert isinstance(token_owner, TokenOwner)
- pendings = getUtility(IPendings)
- # Clear out the previous pending token if there is one.
- if self.token is not None:
- pendings.confirm(self.token)
- # Create a new token to prevent replay attacks. It seems like this
- # would produce the same token, but it won't because the pending adds a
- # bit of randomization.
- self.token_owner = token_owner
- if token_owner is TokenOwner.no_one:
- self.token = None
- return
- pendable = self.PENDABLE_CLASS(
- list_id=self.mlist.list_id,
- email=self.address.email,
- display_name=self.address.display_name,
- when=now().replace(microsecond=0).isoformat(),
- token_owner=token_owner.name,
- )
- self.token = pendings.add(pendable, timedelta(days=3650))
-
-
-@public
-class SubscriptionWorkflow(_SubscriptionWorkflowCommon):
- """Workflow of a subscription request."""
-
- PENDABLE_CLASS = PendableSubscription
- INITIAL_STATE = 'sanity_checks'
- SAVE_ATTRIBUTES = (
- 'pre_approved',
- 'pre_confirmed',
- 'pre_verified',
- 'address_key',
- 'subscriber_key',
- 'user_key',
- 'token_owner_key',
- )
-
- def __init__(self, mlist, subscriber=None, *,
- pre_verified=False, pre_confirmed=False, pre_approved=False):
- super().__init__(mlist, subscriber)
- self.pre_verified = pre_verified
- self.pre_confirmed = pre_confirmed
- self.pre_approved = pre_approved
-
- def _step_sanity_checks(self):
- # Ensure that we have both an address and a user, even if the address
- # is not verified. We can't set the preferred address until it is
- # verified.
- if self.user is None:
- # The address has no linked user so create one, link it, and set
- # the user's preferred address.
- assert self.address is not None, 'No address or user'
- self.user = getUtility(IUserManager).make_user(self.address.email)
- if self.address is None:
- assert self.user.preferred_address is None, (
- "Preferred address exists, but wasn't used in constructor")
- addresses = list(self.user.addresses)
- if len(addresses) == 0:
- raise AssertionError('User has no addresses: {}'.format(
- self.user))
- # This is rather arbitrary, but we have no choice.
- self.address = addresses[0]
- assert self.user is not None and self.address is not None, (
- 'Insane sanity check results')
- # Is this subscriber already a member?
- if (self.which is WhichSubscriber.user and
- self.user.preferred_address is not None):
- subscriber = self.user
- else:
- subscriber = self.address
- if self.mlist.is_subscribed(subscriber):
- # 2017-04-22 BAW: This branch actually *does* get covered, as I've
- # verified by a full coverage run, but diffcov for some reason
- # claims that the test added in the branch that added this code
- # does not cover the change. That seems like a bug in diffcov.
- raise AlreadySubscribedError( # pragma: nocover
- self.mlist.fqdn_listname,
- self.address.email,
- MemberRole.member)
- # Is this email address banned?
- if IBanManager(self.mlist).is_banned(self.address.email):
- raise MembershipIsBannedError(self.mlist, self.address.email)
- # Check if there is already a subscription request for this email.
- pendings = getUtility(IPendings).find(
- mlist=self.mlist,
- pend_type='subscription')
- for token, pendable in pendings:
- if pendable['email'] == self.address.email:
- raise SubscriptionPendingError(self.mlist, self.address.email)
- # Start out with the subscriber being the token owner.
- self.push('verification_checks')
-
- def _step_verification_checks(self):
- # Is the address already verified, or is the pre-verified flag set?
- if self.address.verified_on is None:
- if self.pre_verified:
- self.address.verified_on = now()
- else:
- # The address being subscribed is not yet verified, so we need
- # to send a validation email that will also confirm that the
- # user wants to be subscribed to this mailing list.
- self.push('send_confirmation')
- return
- self.push('confirmation_checks')
-
- def _step_confirmation_checks(self):
- # If the list's subscription policy is open, then the user can be
- # subscribed right here and now.
- if self.mlist.subscription_policy is SubscriptionPolicy.open:
- self.push('do_subscription')
- return
- # If we do not need the user's confirmation, then skip to the
- # moderation checks.
- if self.mlist.subscription_policy is SubscriptionPolicy.moderate:
- self.push('moderation_checks')
- return
- # If the subscription has been pre-confirmed, then we can skip the
- # confirmation check can be skipped. If moderator approval is
- # required we need to check that, otherwise we can go straight to
- # subscription.
- if self.pre_confirmed:
- next_step = (
- 'moderation_checks'
- if self.mlist.subscription_policy is
- SubscriptionPolicy.confirm_then_moderate # noqa: E131
- else 'do_subscription')
- self.push(next_step)
- return
- # The user must confirm their subscription.
- self.push('send_confirmation')
-
- def _step_moderation_checks(self):
- # Does the moderator need to approve the subscription request?
- assert self.mlist.subscription_policy in (
- SubscriptionPolicy.moderate,
- SubscriptionPolicy.confirm_then_moderate,
- ), self.mlist.subscription_policy
- if self.pre_approved:
- self.push('do_subscription')
- else:
- self.push('get_moderator_approval')
-
- def _step_get_moderator_approval(self):
- # Here's the next step in the workflow, assuming the moderator
- # approves of the subscription. If they don't, the workflow and
- # subscription request will just be thrown away.
- self._set_token(TokenOwner.moderator)
- self.push('subscribe_from_restored')
- self.save()
- log.info('{}: held subscription request from {}'.format(
- self.mlist.fqdn_listname, self.address.email))
- # Possibly send a notification to the list moderators.
- if self.mlist.admin_immed_notify:
- subject = _(
- 'New subscription request to $self.mlist.display_name '
- 'from $self.address.email')
- username = formataddr(
- (self.subscriber.display_name, self.address.email))
- template = getUtility(ITemplateLoader).get(
- 'list:admin:action:subscribe', self.mlist)
- text = wrap(expand(template, self.mlist, dict(
- member=username,
- )))
- # This message should appear to come from the <list>-owner so as
- # to avoid any useless bounce processing.
- msg = UserNotification(
- self.mlist.owner_address, self.mlist.owner_address,
- subject, text, self.mlist.preferred_language)
- msg.send(self.mlist)
- # The workflow must stop running here.
- raise StopIteration
-
- def _step_subscribe_from_restored(self):
- # Prevent replay attacks.
- self._set_token(TokenOwner.no_one)
- # Restore a little extra state that can't be stored in the database
- # (because the order of setattr() on restore is indeterminate), then
- # subscribe the user.
- if self.which is WhichSubscriber.address:
- self.subscriber = self.address
- else:
- assert self.which is WhichSubscriber.user
- self.subscriber = self.user
- self.push('do_subscription')
-
- def _step_do_subscription(self):
- # We can immediately subscribe the user to the mailing list.
- self.member = self.mlist.subscribe(self.subscriber)
- assert self.token is None and self.token_owner is TokenOwner.no_one, (
- 'Unexpected active token at end of subscription workflow')
-
- def _step_send_confirmation(self):
- self._set_token(TokenOwner.subscriber)
- self.push('do_confirm_verify')
- self.save()
- # Triggering this event causes the confirmation message to be sent.
- notify(SubscriptionConfirmationNeededEvent(
- self.mlist, self.token, self.address.email))
- # Now we wait for the confirmation.
- raise StopIteration
-
- def _step_do_confirm_verify(self):
- # Restore a little extra state that can't be stored in the database
- # (because the order of setattr() on restore is indeterminate), then
- # continue with the confirmation/verification step.
- if self.which is WhichSubscriber.address:
- self.subscriber = self.address
- else:
- assert self.which is WhichSubscriber.user
- self.subscriber = self.user
- # Reset the token so it can't be used in a replay attack.
- self._set_token(TokenOwner.no_one)
- # The user has confirmed their subscription request, and also verified
- # their email address if necessary. This latter needs to be set on the
- # IAddress, but there's nothing more to do about the confirmation step.
- # We just continue along with the workflow.
- if self.address.verified_on is None:
- self.address.verified_on = now()
- # The next step depends on the mailing list's subscription policy.
- next_step = ('moderation_checks'
- if self.mlist.subscription_policy in (
- SubscriptionPolicy.moderate,
- SubscriptionPolicy.confirm_then_moderate,
- )
- else 'do_subscription')
- self.push(next_step)
-
-
-@public
-class UnSubscriptionWorkflow(_SubscriptionWorkflowCommon):
- """Workflow of a unsubscription request."""
-
- PENDABLE_CLASS = PendableUnsubscription
- INITIAL_STATE = 'subscription_checks'
- SAVE_ATTRIBUTES = (
- 'pre_approved',
- 'pre_confirmed',
- 'address_key',
- 'user_key',
- 'subscriber_key',
- 'token_owner_key',
- )
-
- def __init__(self, mlist, subscriber=None, *,
- pre_approved=False, pre_confirmed=False):
- super().__init__(mlist, subscriber)
- if IAddress.providedBy(subscriber) or IUser.providedBy(subscriber):
- self.member = self.mlist.regular_members.get_member(
- self.address.email)
- self.pre_confirmed = pre_confirmed
- self.pre_approved = pre_approved
-
- def _step_subscription_checks(self):
- assert self.mlist.is_subscribed(self.subscriber)
- self.push('confirmation_checks')
-
- def _step_confirmation_checks(self):
- # If list's unsubscription policy is open, the user can unsubscribe
- # right now.
- if self.mlist.unsubscription_policy is SubscriptionPolicy.open:
- self.push('do_unsubscription')
- return
- # If we don't need the user's confirmation, then skip to the moderation
- # checks.
- if self.mlist.unsubscription_policy is SubscriptionPolicy.moderate:
- self.push('moderation_checks')
- return
- # If the request is pre-confirmed, then the user can unsubscribe right
- # now.
- if self.pre_confirmed:
- self.push('do_unsubscription')
- return
- # The user must confirm their un-subsbcription.
- self.push('send_confirmation')
-
- def _step_send_confirmation(self):
- self._set_token(TokenOwner.subscriber)
- self.push('do_confirm_verify')
- self.save()
- notify(UnsubscriptionConfirmationNeededEvent(
- self.mlist, self.token, self.address.email))
- raise StopIteration
-
- def _step_moderation_checks(self):
- # Does the moderator need to approve the unsubscription request?
- assert self.mlist.unsubscription_policy in (
- SubscriptionPolicy.moderate,
- SubscriptionPolicy.confirm_then_moderate,
- ), self.mlist.unsubscription_policy
- if self.pre_approved:
- self.push('do_unsubscription')
- else:
- self.push('get_moderator_approval')
-
- def _step_get_moderator_approval(self):
- self._set_token(TokenOwner.moderator)
- self.push('unsubscribe_from_restored')
- self.save()
- log.info('{}: held unsubscription request from {}'.format(
- self.mlist.fqdn_listname, self.address.email))
- if self.mlist.admin_immed_notify:
- subject = _(
- 'New unsubscription request to $self.mlist.display_name '
- 'from $self.address.email')
- username = formataddr(
- (self.subscriber.display_name, self.address.email))
- template = getUtility(ITemplateLoader).get(
- 'list:admin:action:unsubscribe', self.mlist)
- text = wrap(expand(template, self.mlist, dict(
- member=username,
- )))
- # This message should appear to come from the <list>-owner so as
- # to avoid any useless bounce processing.
- msg = UserNotification(
- self.mlist.owner_address, self.mlist.owner_address,
- subject, text, self.mlist.preferred_language)
- msg.send(self.mlist)
- # The workflow must stop running here
- raise StopIteration
-
- def _step_do_confirm_verify(self):
- # Restore a little extra state that can't be stored in the database
- # (because the order of setattr() on restore is indeterminate), then
- # continue with the confirmation/verification step.
- if self.which is WhichSubscriber.address:
- self.subscriber = self.address
- else:
- assert self.which is WhichSubscriber.user
- self.subscriber = self.user
- # Reset the token so it can't be used in a replay attack.
- self._set_token(TokenOwner.no_one)
- # Restore the member object.
- self.member = self.mlist.regular_members.get_member(self.address.email)
- # It's possible the member was already unsubscribed while we were
- # waiting for the confirmation.
- if self.member is None:
- return
- # The user has confirmed their unsubscription request
- next_step = ('moderation_checks'
- if self.mlist.unsubscription_policy in (
- SubscriptionPolicy.moderate,
- SubscriptionPolicy.confirm_then_moderate,
- )
- else 'do_unsubscription')
- self.push(next_step)
-
- def _step_do_unsubscription(self):
- try:
- delete_member(self.mlist, self.address.email)
- except NotAMemberError:
- # The member has already been unsubscribed.
- pass
- self.member = None
- assert self.token is None and self.token_owner is TokenOwner.no_one, (
- 'Unexpected active token at end of subscription workflow')
-
- def _step_unsubscribe_from_restored(self):
- # Prevent replay attacks.
- self._set_token(TokenOwner.no_one)
- if self.which is WhichSubscriber.address:
- self.subscriber = self.address
- else:
- assert self.which is WhichSubscriber.user
- self.subscriber = self.user
- self.push('do_unsubscription')
-
-
@public
@implementer(ISubscriptionManager)
class SubscriptionManager:
def __init__(self, mlist):
self._mlist = mlist
- def register(self, subscriber=None, *,
- pre_verified=False, pre_confirmed=False, pre_approved=False):
+ def register(self, subscriber=None, **kwargs):
"""See `ISubscriptionManager`."""
- workflow = SubscriptionWorkflow(
- self._mlist, subscriber,
- pre_verified=pre_verified,
- pre_confirmed=pre_confirmed,
- pre_approved=pre_approved)
+ workflow = self._mlist.subscription_policy(self._mlist, subscriber,
+ **kwargs)
list(workflow)
return workflow.token, workflow.token_owner, workflow.member
- def unregister(self, subscriber=None, *,
- pre_confirmed=False, pre_approved=False):
- workflow = UnSubscriptionWorkflow(
- self._mlist, subscriber,
- pre_confirmed=pre_confirmed,
- pre_approved=pre_approved)
+ def unregister(self, subscriber=None, **kwargs):
+ workflow = self._mlist.unsubscription_policy(self._mlist, subscriber,
+ **kwargs)
list(workflow)
return workflow.token, workflow.token_owner, workflow.member
@@ -536,11 +57,13 @@ class SubscriptionManager:
if pendable is None:
raise LookupError
workflow_type = pendable.get('type')
- assert workflow_type in (PendableSubscription.PEND_TYPE,
- PendableUnsubscription.PEND_TYPE)
- workflow = (SubscriptionWorkflow
- if workflow_type == PendableSubscription.PEND_TYPE
- else UnSubscriptionWorkflow)(self._mlist)
+ if workflow_type in {PendableSubscription.PEND_TYPE,
+ PendableUnsubscription.PEND_TYPE}:
+ workflow = (self._mlist.subscription_policy # pragma: nocover
+ if workflow_type == PendableSubscription.PEND_TYPE
+ else self._mlist.unsubscription_policy)(self._mlist)
+ else:
+ workflow = config.workflows[workflow_type](self._mlist)
workflow.token = token
workflow.restore()
# In order to just run the whole workflow, all we need to do
@@ -556,41 +79,6 @@ class SubscriptionManager:
getUtility(IWorkflowStateManager).discard(token)
-def _handle_confirmation_needed_events(event, template_name):
- subject = 'confirm {}'.format(event.token)
- confirm_address = event.mlist.confirm_address(event.token)
- email_address = event.email
- # Send a verification email to the address.
- template = getUtility(ITemplateLoader).get(template_name, event.mlist)
- text = expand(template, event.mlist, dict(
- token=event.token,
- subject=subject,
- confirm_email=confirm_address,
- user_email=email_address,
- # For backward compatibility.
- confirm_address=confirm_address,
- email_address=email_address,
- domain_name=event.mlist.domain.mail_host,
- contact_address=event.mlist.owner_address,
- ))
- msg = UserNotification(email_address, confirm_address, subject, text)
- msg.send(event.mlist, add_precedence=False)
-
-
-@public
-def handle_SubscriptionConfirmationNeededEvent(event):
- if not isinstance(event, SubscriptionConfirmationNeededEvent):
- return
- _handle_confirmation_needed_events(event, 'list:user:action:subscribe')
-
-
-@public
-def handle_UnsubscriptionConfirmationNeededEvent(event):
- if not isinstance(event, UnsubscriptionConfirmationNeededEvent):
- return
- _handle_confirmation_needed_events(event, 'list:user:action:unsubscribe')
-
-
@public
def handle_ListDeletingEvent(event):
"""Delete a mailing list's members when the list is being deleted."""
@@ -599,6 +87,6 @@ def handle_ListDeletingEvent(event):
return
# Find all the members still associated with the mailing list.
members = getUtility(ISubscriptionService).find_members(
- list_id=event.mailing_list.list_id)
+ list_id=event.mailing_list.list_id)
for member in members:
member.unsubscribe()
diff --git a/src/mailman/app/tests/test_moderation.py b/src/mailman/app/tests/test_moderation.py
index 5448a1d79..7d7524a10 100644
--- a/src/mailman/app/tests/test_moderation.py
+++ b/src/mailman/app/tests/test_moderation.py
@@ -162,7 +162,7 @@ class TestUnsubscription(unittest.TestCase):
user_manager = getUtility(IUserManager)
anne = user_manager.create_address('anne@example.org', 'Anne Person')
token, token_owner, member = self._manager.register(
- anne, pre_verified=True, pre_confirmed=True, pre_approved=True)
+ anne, pre_verified=True, pre_confirmed=True)
self.assertIsNone(token)
self.assertEqual(member.address.email, 'anne@example.org')
bart = user_manager.create_user('bart@example.com', 'Bart User')
diff --git a/src/mailman/app/tests/test_subscriptions.py b/src/mailman/app/tests/test_subscriptions.py
deleted file mode 100644
index 0519ec385..000000000
--- a/src/mailman/app/tests/test_subscriptions.py
+++ /dev/null
@@ -1,720 +0,0 @@
-# Copyright (C) 2011-2017 by the Free Software Foundation, Inc.
-#
-# This file is part of GNU Mailman.
-#
-# GNU Mailman is free software: you can redistribute it and/or modify it under
-# the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 3 of the License, or (at your option)
-# any later version.
-#
-# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
-# more details.
-#
-# You should have received a copy of the GNU General Public License along with
-# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
-
-"""Tests for the subscription service."""
-
-import unittest
-
-from contextlib import suppress
-from mailman.app.lifecycle import create_list
-from mailman.app.subscriptions import SubscriptionWorkflow
-from mailman.interfaces.bans import IBanManager
-from mailman.interfaces.mailinglist import SubscriptionPolicy
-from mailman.interfaces.member import MemberRole, MembershipIsBannedError
-from mailman.interfaces.pending import IPendings
-from mailman.interfaces.subscriptions import TokenOwner
-from mailman.interfaces.usermanager import IUserManager
-from mailman.testing.helpers import (
- LogFileMark, get_queue_messages, set_preferred)
-from mailman.testing.layers import ConfigLayer
-from mailman.utilities.datetime import now
-from unittest.mock import patch
-from zope.component import getUtility
-
-
-class TestSubscriptionWorkflow(unittest.TestCase):
- layer = ConfigLayer
- maxDiff = None
-
- def setUp(self):
- self._mlist = create_list('test@example.com')
- self._mlist.admin_immed_notify = False
- self._anne = 'anne@example.com'
- self._user_manager = getUtility(IUserManager)
- self._expected_pendings_count = 0
-
- def tearDown(self):
- # There usually should be no pending after all is said and done, but
- # some tests don't complete the workflow.
- self.assertEqual(getUtility(IPendings).count,
- self._expected_pendings_count)
-
- def test_start_state(self):
- # The workflow starts with no tokens or member.
- workflow = SubscriptionWorkflow(self._mlist)
- self.assertIsNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.no_one)
- self.assertIsNone(workflow.member)
-
- def test_pended_data(self):
- # There is a Pendable associated with the held request, and it has
- # some data associated with it.
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne)
- with suppress(StopIteration):
- workflow.run_thru('send_confirmation')
- self.assertIsNotNone(workflow.token)
- pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
- self.assertEqual(pendable['list_id'], 'test.example.com')
- self.assertEqual(pendable['email'], 'anne@example.com')
- self.assertEqual(pendable['display_name'], '')
- self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
- self.assertEqual(pendable['token_owner'], 'subscriber')
- # The token is still in the database.
- self._expected_pendings_count = 1
-
- def test_user_or_address_required(self):
- # The `subscriber` attribute must be a user or address.
- workflow = SubscriptionWorkflow(self._mlist)
- self.assertRaises(AssertionError, list, workflow)
-
- def test_sanity_checks_address(self):
- # Ensure that the sanity check phase, when given an IAddress, ends up
- # with a linked user.
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne)
- self.assertIsNotNone(workflow.address)
- self.assertIsNone(workflow.user)
- workflow.run_thru('sanity_checks')
- self.assertIsNotNone(workflow.address)
- self.assertIsNotNone(workflow.user)
- self.assertEqual(list(workflow.user.addresses)[0].email, self._anne)
-
- def test_sanity_checks_user_with_preferred_address(self):
- # Ensure that the sanity check phase, when given an IUser with a
- # preferred address, ends up with an address.
- anne = self._user_manager.make_user(self._anne)
- address = set_preferred(anne)
- workflow = SubscriptionWorkflow(self._mlist, anne)
- # The constructor sets workflow.address because the user has a
- # preferred address.
- self.assertEqual(workflow.address, address)
- self.assertEqual(workflow.user, anne)
- workflow.run_thru('sanity_checks')
- self.assertEqual(workflow.address, address)
- self.assertEqual(workflow.user, anne)
-
- def test_sanity_checks_user_without_preferred_address(self):
- # Ensure that the sanity check phase, when given a user without a
- # preferred address, but with at least one linked address, gets an
- # address.
- anne = self._user_manager.make_user(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne)
- self.assertIsNone(workflow.address)
- self.assertEqual(workflow.user, anne)
- workflow.run_thru('sanity_checks')
- self.assertIsNotNone(workflow.address)
- self.assertEqual(workflow.user, anne)
-
- def test_sanity_checks_user_with_multiple_linked_addresses(self):
- # Ensure that the santiy check phase, when given a user without a
- # preferred address, but with multiple linked addresses, gets of of
- # those addresses (exactly which one is undefined).
- anne = self._user_manager.make_user(self._anne)
- anne.link(self._user_manager.create_address('anne@example.net'))
- anne.link(self._user_manager.create_address('anne@example.org'))
- workflow = SubscriptionWorkflow(self._mlist, anne)
- self.assertIsNone(workflow.address)
- self.assertEqual(workflow.user, anne)
- workflow.run_thru('sanity_checks')
- self.assertIn(workflow.address.email, ['anne@example.com',
- 'anne@example.net',
- 'anne@example.org'])
- self.assertEqual(workflow.user, anne)
-
- def test_sanity_checks_user_without_addresses(self):
- # It is an error to try to subscribe a user with no linked addresses.
- user = self._user_manager.create_user()
- workflow = SubscriptionWorkflow(self._mlist, user)
- self.assertRaises(AssertionError, workflow.run_thru, 'sanity_checks')
-
- def test_sanity_checks_globally_banned_address(self):
- # An exception is raised if the address is globally banned.
- anne = self._user_manager.create_address(self._anne)
- IBanManager(None).ban(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne)
- self.assertRaises(MembershipIsBannedError, list, workflow)
-
- def test_sanity_checks_banned_address(self):
- # An exception is raised if the address is banned by the mailing list.
- anne = self._user_manager.create_address(self._anne)
- IBanManager(self._mlist).ban(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne)
- self.assertRaises(MembershipIsBannedError, list, workflow)
-
- def test_verification_checks_with_verified_address(self):
- # When the address is already verified, we skip straight to the
- # confirmation checks.
- anne = self._user_manager.create_address(self._anne)
- anne.verified_on = now()
- workflow = SubscriptionWorkflow(self._mlist, anne)
- workflow.run_thru('verification_checks')
- with patch.object(workflow, '_step_confirmation_checks') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_verification_checks_with_pre_verified_address(self):
- # When the address is not yet verified, but the pre-verified flag is
- # passed to the workflow, we skip to the confirmation checks.
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
- workflow.run_thru('verification_checks')
- with patch.object(workflow, '_step_confirmation_checks') as step:
- next(workflow)
- step.assert_called_once_with()
- # And now the address is verified.
- self.assertIsNotNone(anne.verified_on)
-
- def test_verification_checks_confirmation_needed(self):
- # The address is neither verified, nor is the pre-verified flag set.
- # A confirmation message must be sent to the user which will also
- # verify their address.
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne)
- workflow.run_thru('verification_checks')
- with patch.object(workflow, '_step_send_confirmation') as step:
- next(workflow)
- step.assert_called_once_with()
- # The address still hasn't been verified.
- self.assertIsNone(anne.verified_on)
-
- def test_confirmation_checks_open_list(self):
- # A subscription to an open list does not need to be confirmed or
- # moderated.
- self._mlist.subscription_policy = SubscriptionPolicy.open
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
- workflow.run_thru('confirmation_checks')
- with patch.object(workflow, '_step_do_subscription') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_confirmation_checks_no_user_confirmation_needed(self):
- # A subscription to a list which does not need user confirmation skips
- # to the moderation checks.
- self._mlist.subscription_policy = SubscriptionPolicy.moderate
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
- workflow.run_thru('confirmation_checks')
- with patch.object(workflow, '_step_moderation_checks') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_confirmation_checks_confirm_pre_confirmed(self):
- # The subscription policy requires user confirmation, but their
- # subscription is pre-confirmed. Since moderation is not required,
- # the user will be immediately subscribed.
- self._mlist.subscription_policy = SubscriptionPolicy.confirm
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne,
- pre_verified=True,
- pre_confirmed=True)
- workflow.run_thru('confirmation_checks')
- with patch.object(workflow, '_step_do_subscription') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_confirmation_checks_confirm_then_moderate_pre_confirmed(self):
- # The subscription policy requires user confirmation, but their
- # subscription is pre-confirmed. Since moderation is required, that
- # check will be performed.
- self._mlist.subscription_policy = (
- SubscriptionPolicy.confirm_then_moderate)
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne,
- pre_verified=True,
- pre_confirmed=True)
- workflow.run_thru('confirmation_checks')
- with patch.object(workflow, '_step_moderation_checks') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_confirmation_checks_confirm_and_moderate_pre_confirmed(self):
- # The subscription policy requires user confirmation and moderation,
- # but their subscription is pre-confirmed.
- self._mlist.subscription_policy = (
- SubscriptionPolicy.confirm_then_moderate)
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne,
- pre_verified=True,
- pre_confirmed=True)
- workflow.run_thru('confirmation_checks')
- with patch.object(workflow, '_step_moderation_checks') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_confirmation_checks_confirmation_needed(self):
- # The subscription policy requires confirmation and the subscription
- # is not pre-confirmed.
- self._mlist.subscription_policy = SubscriptionPolicy.confirm
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
- workflow.run_thru('confirmation_checks')
- with patch.object(workflow, '_step_send_confirmation') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_confirmation_checks_moderate_confirmation_needed(self):
- # The subscription policy requires confirmation and moderation, and the
- # subscription is not pre-confirmed.
- self._mlist.subscription_policy = (
- SubscriptionPolicy.confirm_then_moderate)
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
- workflow.run_thru('confirmation_checks')
- with patch.object(workflow, '_step_send_confirmation') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_moderation_checks_pre_approved(self):
- # The subscription is pre-approved by the moderator.
- self._mlist.subscription_policy = SubscriptionPolicy.moderate
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne,
- pre_verified=True,
- pre_approved=True)
- workflow.run_thru('moderation_checks')
- with patch.object(workflow, '_step_do_subscription') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_moderation_checks_approval_required(self):
- # The moderator must approve the subscription.
- self._mlist.subscription_policy = SubscriptionPolicy.moderate
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
- workflow.run_thru('moderation_checks')
- with patch.object(workflow, '_step_get_moderator_approval') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_do_subscription(self):
- # An open subscription policy plus a pre-verified address means the
- # user gets subscribed to the mailing list without any further
- # confirmations or approvals.
- self._mlist.subscription_policy = SubscriptionPolicy.open
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
- # Consume the entire state machine.
- list(workflow)
- # Anne is now a member of the mailing list.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertEqual(member.address, anne)
- self.assertEqual(workflow.member, member)
- # No further token is needed.
- self.assertIsNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.no_one)
-
- def test_do_subscription_pre_approved(self):
- # An moderation-requiring subscription policy plus a pre-verified and
- # pre-approved address means the user gets subscribed to the mailing
- # list without any further confirmations or approvals.
- self._mlist.subscription_policy = SubscriptionPolicy.moderate
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne,
- pre_verified=True,
- pre_approved=True)
- # Consume the entire state machine.
- list(workflow)
- # Anne is now a member of the mailing list.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertEqual(member.address, anne)
- self.assertEqual(workflow.member, member)
- # No further token is needed.
- self.assertIsNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.no_one)
-
- def test_do_subscription_pre_approved_pre_confirmed(self):
- # An moderation-requiring subscription policy plus a pre-verified and
- # pre-approved address means the user gets subscribed to the mailing
- # list without any further confirmations or approvals.
- self._mlist.subscription_policy = (
- SubscriptionPolicy.confirm_then_moderate)
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne,
- pre_verified=True,
- pre_confirmed=True,
- pre_approved=True)
- # Consume the entire state machine.
- list(workflow)
- # Anne is now a member of the mailing list.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertEqual(member.address, anne)
- self.assertEqual(workflow.member, member)
- # No further token is needed.
- self.assertIsNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.no_one)
-
- def test_do_subscription_cleanups(self):
- # Once the user is subscribed, the token, and its associated pending
- # database record will be removed from the database.
- self._mlist.subscription_policy = SubscriptionPolicy.open
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne,
- pre_verified=True,
- pre_confirmed=True,
- pre_approved=True)
- # Consume the entire state machine.
- list(workflow)
- # Anne is now a member of the mailing list.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertEqual(member.address, anne)
- self.assertEqual(workflow.member, member)
- # The workflow is done, so it has no token.
- self.assertIsNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.no_one)
-
- def test_moderator_approves(self):
- # The workflow runs until moderator approval is required, at which
- # point the workflow is saved. Once the moderator approves, the
- # workflow resumes and the user is subscribed.
- self._mlist.subscription_policy = SubscriptionPolicy.moderate
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne,
- pre_verified=True,
- pre_confirmed=True)
- # Consume the entire state machine.
- list(workflow)
- # The user is not currently subscribed to the mailing list.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNone(member)
- self.assertIsNone(workflow.member)
- # The token is owned by the moderator.
- self.assertIsNotNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.moderator)
- # Create a new workflow with the previous workflow's save token, and
- # restore its state. This models an approved subscription and should
- # result in the user getting subscribed.
- approved_workflow = SubscriptionWorkflow(self._mlist)
- approved_workflow.token = workflow.token
- approved_workflow.restore()
- list(approved_workflow)
- # Now the user is subscribed to the mailing list.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertEqual(member.address, anne)
- self.assertEqual(approved_workflow.member, member)
- # No further token is needed.
- self.assertIsNone(approved_workflow.token)
- self.assertEqual(approved_workflow.token_owner, TokenOwner.no_one)
-
- def test_get_moderator_approval_log_on_hold(self):
- # When the subscription is held for moderator approval, a message is
- # logged.
- mark = LogFileMark('mailman.subscribe')
- self._mlist.subscription_policy = SubscriptionPolicy.moderate
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne,
- pre_verified=True,
- pre_confirmed=True)
- # Consume the entire state machine.
- list(workflow)
- self.assertIn(
- 'test@example.com: held subscription request from anne@example.com',
- mark.readline()
- )
- # The state machine stopped at the moderator approval so there will be
- # one token still in the database.
- self._expected_pendings_count = 1
-
- def test_get_moderator_approval_notifies_moderators(self):
- # When the subscription is held for moderator approval, and the list
- # is so configured, a notification is sent to the list moderators.
- self._mlist.admin_immed_notify = True
- self._mlist.subscription_policy = SubscriptionPolicy.moderate
- anne = self._user_manager.create_address(self._anne)
- bart = self._user_manager.create_user('bart@example.com', 'Bart User')
- address = set_preferred(bart)
- self._mlist.subscribe(address, MemberRole.moderator)
- workflow = SubscriptionWorkflow(self._mlist, anne,
- pre_verified=True,
- pre_confirmed=True)
- # Consume the entire state machine.
- list(workflow)
- # Find the moderator message.
- items = get_queue_messages('virgin', expected_count=1)
- for item in items:
- if item.msg['to'] == 'test-owner@example.com':
- break
- else:
- raise AssertionError('No moderator email found')
- self.assertEqual(
- item.msgdata['recipients'], {'test-owner@example.com'})
- message = items[0].msg
- self.assertEqual(message['From'], 'test-owner@example.com')
- self.assertEqual(message['To'], 'test-owner@example.com')
- self.assertEqual(
- message['Subject'],
- 'New subscription request to Test from anne@example.com')
- self.assertEqual(message.get_payload(), """\
-Your authorization is required for a mailing list subscription request
-approval:
-
- For: anne@example.com
- List: test@example.com
-""")
- # The state machine stopped at the moderator approval so there will be
- # one token still in the database.
- self._expected_pendings_count = 1
-
- def test_get_moderator_approval_no_notifications(self):
- # When the subscription is held for moderator approval, and the list
- # is so configured, a notification is sent to the list moderators.
- self._mlist.admin_immed_notify = False
- self._mlist.subscription_policy = SubscriptionPolicy.moderate
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne,
- pre_verified=True,
- pre_confirmed=True)
- # Consume the entire state machine.
- list(workflow)
- get_queue_messages('virgin', expected_count=0)
- # The state machine stopped at the moderator approval so there will be
- # one token still in the database.
- self._expected_pendings_count = 1
-
- def test_send_confirmation(self):
- # A confirmation message gets sent when the address is not verified.
- anne = self._user_manager.create_address(self._anne)
- self.assertIsNone(anne.verified_on)
- # Run the workflow to model the confirmation step.
- workflow = SubscriptionWorkflow(self._mlist, anne)
- list(workflow)
- items = get_queue_messages('virgin', expected_count=1)
- message = items[0].msg
- token = workflow.token
- self.assertEqual(message['Subject'], 'confirm {}'.format(token))
- self.assertEqual(
- message['From'], 'test-confirm+{}@example.com'.format(token))
- # The confirmation message is not `Precedence: bulk`.
- self.assertIsNone(message['precedence'])
- # The state machine stopped at the moderator approval so there will be
- # one token still in the database.
- self._expected_pendings_count = 1
-
- def test_send_confirmation_pre_confirmed(self):
- # A confirmation message gets sent when the address is not verified
- # but the subscription is pre-confirmed.
- anne = self._user_manager.create_address(self._anne)
- self.assertIsNone(anne.verified_on)
- # Run the workflow to model the confirmation step.
- workflow = SubscriptionWorkflow(self._mlist, anne, pre_confirmed=True)
- list(workflow)
- items = get_queue_messages('virgin', expected_count=1)
- message = items[0].msg
- token = workflow.token
- self.assertEqual(
- message['Subject'], 'confirm {}'.format(workflow.token))
- self.assertEqual(
- message['From'], 'test-confirm+{}@example.com'.format(token))
- # The state machine stopped at the moderator approval so there will be
- # one token still in the database.
- self._expected_pendings_count = 1
-
- def test_send_confirmation_pre_verified(self):
- # A confirmation message gets sent even when the address is verified
- # when the subscription must be confirmed.
- self._mlist.subscription_policy = SubscriptionPolicy.confirm
- anne = self._user_manager.create_address(self._anne)
- self.assertIsNone(anne.verified_on)
- # Run the workflow to model the confirmation step.
- workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
- list(workflow)
- items = get_queue_messages('virgin', expected_count=1)
- message = items[0].msg
- token = workflow.token
- self.assertEqual(
- message['Subject'], 'confirm {}'.format(workflow.token))
- self.assertEqual(
- message['From'], 'test-confirm+{}@example.com'.format(token))
- # The state machine stopped at the moderator approval so there will be
- # one token still in the database.
- self._expected_pendings_count = 1
-
- def test_do_confirm_verify_address(self):
- # The address is not yet verified, nor are we pre-verifying. A
- # confirmation message will be sent. When the user confirms their
- # subscription request, the address will end up being verified.
- anne = self._user_manager.create_address(self._anne)
- self.assertIsNone(anne.verified_on)
- # Run the workflow to model the confirmation step.
- workflow = SubscriptionWorkflow(self._mlist, anne)
- list(workflow)
- # The address is still not verified.
- self.assertIsNone(anne.verified_on)
- confirm_workflow = SubscriptionWorkflow(self._mlist)
- confirm_workflow.token = workflow.token
- confirm_workflow.restore()
- confirm_workflow.run_thru('do_confirm_verify')
- # The address is now verified.
- self.assertIsNotNone(anne.verified_on)
-
- def test_do_confirm_verify_user(self):
- # A confirmation step is necessary when a user subscribes with their
- # preferred address, and we are not pre-confirming.
- anne = self._user_manager.create_user(self._anne)
- set_preferred(anne)
- # Run the workflow to model the confirmation step. There is no
- # subscriber attribute yet.
- workflow = SubscriptionWorkflow(self._mlist, anne)
- list(workflow)
- self.assertEqual(workflow.subscriber, anne)
- # Do a confirmation workflow, which should now set the subscriber.
- confirm_workflow = SubscriptionWorkflow(self._mlist)
- confirm_workflow.token = workflow.token
- confirm_workflow.restore()
- confirm_workflow.run_thru('do_confirm_verify')
- # The address is now verified.
- self.assertEqual(confirm_workflow.subscriber, anne)
-
- def test_do_confirmation_subscribes_user(self):
- # Subscriptions to the mailing list must be confirmed. Once that's
- # done, the user's address (which is not initially verified) gets
- # subscribed to the mailing list.
- self._mlist.subscription_policy = SubscriptionPolicy.confirm
- anne = self._user_manager.create_address(self._anne)
- self.assertIsNone(anne.verified_on)
- workflow = SubscriptionWorkflow(self._mlist, anne)
- list(workflow)
- # Anne is not yet a member.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNone(member)
- self.assertIsNone(workflow.member)
- # The token is owned by the subscriber.
- self.assertIsNotNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.subscriber)
- # Confirm.
- confirm_workflow = SubscriptionWorkflow(self._mlist)
- confirm_workflow.token = workflow.token
- confirm_workflow.restore()
- list(confirm_workflow)
- self.assertIsNotNone(anne.verified_on)
- # Anne is now a member.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertEqual(member.address, anne)
- self.assertEqual(confirm_workflow.member, member)
- # No further token is needed.
- self.assertIsNone(confirm_workflow.token)
- self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one)
-
- def test_prevent_confirmation_replay_attacks(self):
- # Ensure that if the workflow requires two confirmations, e.g. first
- # the user confirming their subscription, and then the moderator
- # approving it, that different tokens are used in these two cases.
- self._mlist.subscription_policy = (
- SubscriptionPolicy.confirm_then_moderate)
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
- # Run the state machine up to the first confirmation, and cache the
- # confirmation token.
- list(workflow)
- token = workflow.token
- # Anne is not yet a member of the mailing list.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNone(member)
- self.assertIsNone(workflow.member)
- # The token is owned by the subscriber.
- self.assertIsNotNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.subscriber)
- # The old token will not work for moderator approval.
- moderator_workflow = SubscriptionWorkflow(self._mlist)
- moderator_workflow.token = token
- moderator_workflow.restore()
- list(moderator_workflow)
- # The token is owned by the moderator.
- self.assertIsNotNone(moderator_workflow.token)
- self.assertEqual(moderator_workflow.token_owner, TokenOwner.moderator)
- # While we wait for the moderator to approve the subscription, note
- # that there's a new token for the next steps.
- self.assertNotEqual(token, moderator_workflow.token)
- # The old token won't work.
- final_workflow = SubscriptionWorkflow(self._mlist)
- final_workflow.token = token
- self.assertRaises(LookupError, final_workflow.restore)
- # Running this workflow will fail.
- self.assertRaises(AssertionError, list, final_workflow)
- # Anne is still not subscribed.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNone(member)
- self.assertIsNone(final_workflow.member)
- # However, if we use the new token, her subscription request will be
- # approved by the moderator.
- final_workflow.token = moderator_workflow.token
- final_workflow.restore()
- list(final_workflow)
- # And now Anne is a member.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertEqual(member.address.email, self._anne)
- self.assertEqual(final_workflow.member, member)
- # No further token is needed.
- self.assertIsNone(final_workflow.token)
- self.assertEqual(final_workflow.token_owner, TokenOwner.no_one)
-
- def test_confirmation_needed_and_pre_confirmed(self):
- # The subscription policy is 'confirm' but the subscription is
- # pre-confirmed so the moderation checks can be skipped.
- self._mlist.subscription_policy = SubscriptionPolicy.confirm
- anne = self._user_manager.create_address(self._anne)
- workflow = SubscriptionWorkflow(
- self._mlist, anne,
- pre_verified=True, pre_confirmed=True, pre_approved=True)
- list(workflow)
- # Anne was subscribed.
- self.assertIsNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.no_one)
- self.assertEqual(workflow.member.address, anne)
-
- def test_restore_user_absorbed(self):
- # The subscribing user is absorbed (and thus deleted) before the
- # moderator approves the subscription.
- self._mlist.subscription_policy = SubscriptionPolicy.moderate
- anne = self._user_manager.create_user(self._anne)
- bill = self._user_manager.create_user('bill@example.com')
- set_preferred(bill)
- # anne subscribes.
- workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
- list(workflow)
- # bill absorbs anne.
- bill.absorb(anne)
- # anne's subscription request is approved.
- approved_workflow = SubscriptionWorkflow(self._mlist)
- approved_workflow.token = workflow.token
- approved_workflow.restore()
- self.assertEqual(approved_workflow.user, bill)
- # Run the workflow through.
- list(approved_workflow)
-
- def test_restore_address_absorbed(self):
- # The subscribing user is absorbed (and thus deleted) before the
- # moderator approves the subscription.
- self._mlist.subscription_policy = SubscriptionPolicy.moderate
- anne = self._user_manager.create_user(self._anne)
- anne_address = anne.addresses[0]
- bill = self._user_manager.create_user('bill@example.com')
- # anne subscribes.
- workflow = SubscriptionWorkflow(
- self._mlist, anne_address, pre_verified=True)
- list(workflow)
- # bill absorbs anne.
- bill.absorb(anne)
- self.assertIn(anne_address, bill.addresses)
- # anne's subscription request is approved.
- approved_workflow = SubscriptionWorkflow(self._mlist)
- approved_workflow.token = workflow.token
- approved_workflow.restore()
- self.assertEqual(approved_workflow.user, bill)
- # Run the workflow through.
- list(approved_workflow)
diff --git a/src/mailman/app/tests/test_unsubscriptions.py b/src/mailman/app/tests/test_unsubscriptions.py
deleted file mode 100644
index 4ca657190..000000000
--- a/src/mailman/app/tests/test_unsubscriptions.py
+++ /dev/null
@@ -1,522 +0,0 @@
-# Copyright (C) 2016-2017 by the Free Software Foundation, Inc.
-#
-# This file is part of GNU Mailman.
-#
-# GNU Mailman is free software: you can redistribute it and/or modify it under
-# the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 3 of the License, or (at your option)
-# any later version.
-#
-# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
-# more details.
-#
-# You should have received a copy of the GNU General Public License along with
-# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
-
-"""Test for unsubscription service."""
-
-import unittest
-
-from contextlib import suppress
-from mailman.app.lifecycle import create_list
-from mailman.app.subscriptions import UnSubscriptionWorkflow
-from mailman.interfaces.mailinglist import SubscriptionPolicy
-from mailman.interfaces.pending import IPendings
-from mailman.interfaces.subscriptions import TokenOwner
-from mailman.interfaces.usermanager import IUserManager
-from mailman.testing.helpers import LogFileMark, get_queue_messages
-from mailman.testing.layers import ConfigLayer
-from mailman.utilities.datetime import now
-from unittest.mock import patch
-from zope.component import getUtility
-
-
-class TestUnSubscriptionWorkflow(unittest.TestCase):
- layer = ConfigLayer
- maxDiff = None
-
- def setUp(self):
- self._mlist = create_list('test@example.com')
- self._mlist.admin_immed_notify = False
- self._mlist.unsubscription_policy = SubscriptionPolicy.open
- self._mlist.send_welcome_message = False
- self._anne = 'anne@example.com'
- self._user_manager = getUtility(IUserManager)
- self.anne = self._user_manager.create_user(self._anne)
- self.anne.addresses[0].verified_on = now()
- self.anne.preferred_address = self.anne.addresses[0]
- self._mlist.subscribe(self.anne)
- self._expected_pendings_count = 0
-
- def tearDown(self):
- # There usually should be no pending after all is said and done, but
- # some tests don't complete the workflow.
- self.assertEqual(getUtility(IPendings).count,
- self._expected_pendings_count)
-
- def test_start_state(self):
- # Test the workflow starts with no tokens or members.
- workflow = UnSubscriptionWorkflow(self._mlist)
- self.assertEqual(workflow.token_owner, TokenOwner.no_one)
- self.assertIsNone(workflow.token)
- self.assertIsNone(workflow.member)
-
- def test_pended_data(self):
- # Test there is a Pendable object associated with a held
- # unsubscription request and it has some valid data associated with
- # it.
- self._mlist.unsubscription_policy = SubscriptionPolicy.confirm
- workflow = UnSubscriptionWorkflow(self._mlist, self.anne)
- with suppress(StopIteration):
- workflow.run_thru('send_confirmation')
- self.assertIsNotNone(workflow.token)
- pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
- self.assertEqual(pendable['list_id'], 'test.example.com')
- self.assertEqual(pendable['email'], 'anne@example.com')
- self.assertEqual(pendable['display_name'], '')
- self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
- self.assertEqual(pendable['token_owner'], 'subscriber')
- # The token is still in the database.
- self._expected_pendings_count = 1
-
- def test_user_or_address_required(self):
- # The `subscriber` attribute must be a user or address that is provided
- # to the workflow.
- workflow = UnSubscriptionWorkflow(self._mlist)
- self.assertRaises(AssertionError, list, workflow)
-
- def test_user_is_subscribed_to_unsubscribe(self):
- # A user must be subscribed to a list when trying to unsubscribe.
- addr = self._user_manager.create_address('aperson@example.org')
- addr.verfied_on = now()
- workflow = UnSubscriptionWorkflow(self._mlist, addr)
- self.assertRaises(AssertionError,
- workflow.run_thru, 'subscription_checks')
-
- def test_confirmation_checks_open_list(self):
- # An unsubscription from an open list does not need to be confirmed or
- # moderated.
- self._mlist.unsubscription_policy = SubscriptionPolicy.open
- workflow = UnSubscriptionWorkflow(self._mlist, self.anne)
- workflow.run_thru('confirmation_checks')
- with patch.object(workflow, '_step_do_unsubscription') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_confirmation_checks_no_user_confirmation_needed(self):
- # An unsubscription from a list which does not need user confirmation
- # skips to the moderation checks.
- self._mlist.unsubscription_policy = SubscriptionPolicy.moderate
- workflow = UnSubscriptionWorkflow(self._mlist, self.anne,
- pre_confirmed=True)
- workflow.run_thru('confirmation_checks')
- with patch.object(workflow, '_step_moderation_checks') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_confirmation_checks_confirm_pre_confirmed(self):
- # The unsubscription policy requires user-confirmation, but their
- # unsubscription is pre-confirmed. Since moderation is not reuqired,
- # the user will be immediately unsubscribed.
- self._mlist.unsubscription_policy = SubscriptionPolicy.confirm
- workflow = UnSubscriptionWorkflow(
- self._mlist, self.anne, pre_confirmed=True)
- workflow.run_thru('confirmation_checks')
- with patch.object(workflow, '_step_do_unsubscription') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_confirmation_checks_confirm_then_moderate_pre_confirmed(self):
- # The unsubscription policy requires user confirmation, but their
- # unsubscription is pre-confirmed. Since moderation is required, that
- # check will be performed.
- self._mlist.unsubscription_policy = (
- SubscriptionPolicy.confirm_then_moderate)
- workflow = UnSubscriptionWorkflow(
- self._mlist, self.anne, pre_confirmed=True)
- workflow.run_thru('confirmation_checks')
- with patch.object(workflow, '_step_do_unsubscription') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_send_confirmation_checks_confirm_list(self):
- # The unsubscription policy requires user confirmation and the
- # unsubscription is not pre-confirmed.
- self._mlist.unsubscription_policy = SubscriptionPolicy.confirm
- workflow = UnSubscriptionWorkflow(self._mlist, self.anne)
- workflow.run_thru('confirmation_checks')
- with patch.object(workflow, '_step_send_confirmation') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_moderation_checks_moderated_list(self):
- # The unsubscription policy requires moderation.
- self._mlist.unsubscription_policy = SubscriptionPolicy.moderate
- workflow = UnSubscriptionWorkflow(self._mlist, self.anne)
- workflow.run_thru('confirmation_checks')
- with patch.object(workflow, '_step_moderation_checks') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_moderation_checks_approval_required(self):
- # The moderator must approve the subscription request.
- self._mlist.unsubscription_policy = SubscriptionPolicy.moderate
- workflow = UnSubscriptionWorkflow(self._mlist, self.anne)
- workflow.run_thru('moderation_checks')
- with patch.object(workflow, '_step_get_moderator_approval') as step:
- next(workflow)
- step.assert_called_once_with()
-
- def test_do_unsusbcription(self):
- # An open unsubscription policy means the user gets unsubscribed to
- # the mailing list without any further confirmations or approvals.
- self._mlist.unsubscription_policy = SubscriptionPolicy.open
- workflow = UnSubscriptionWorkflow(self._mlist, self.anne)
- list(workflow)
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNone(member)
-
- def test_do_unsubscription_pre_approved(self):
- # A moderation-requiring subscription policy plus a pre-approved
- # address means the user gets unsubscribed from the mailing list
- # without any further confirmation or approvals.
- self._mlist.unsubscription_policy = SubscriptionPolicy.moderate
- workflow = UnSubscriptionWorkflow(self._mlist, self.anne,
- pre_approved=True)
- list(workflow)
- # Anne is now unsubscribed form the mailing list.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNone(member)
- # No further token is needed.
- self.assertIsNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.no_one)
-
- def test_do_unsubscription_pre_approved_pre_confirmed(self):
- # A moderation-requiring unsubscription policy plus a pre-appvoed
- # address means the user gets unsubscribed to the mailing list without
- # any further confirmations or approvals.
- self._mlist.unsubscription_policy = (
- SubscriptionPolicy.confirm_then_moderate)
- workflow = UnSubscriptionWorkflow(self._mlist, self.anne,
- pre_approved=True,
- pre_confirmed=True)
- list(workflow)
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNone(member)
- # No further token is needed.
- self.assertIsNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.no_one)
-
- def test_do_unsubscription_cleanups(self):
- # Once the user is unsubscribed, the token and its associated pending
- # database record will be removed from the database.
- self._mlist.unsubscription_policy = SubscriptionPolicy.open
- workflow = UnSubscriptionWorkflow(self._mlist, self.anne,
- pre_approved=True,
- pre_confirmed=True)
- # Run the workflow.
- list(workflow)
- # Anne is now unsubscribed from the list.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNone(member)
- # Workflow is done, so it has no token.
- self.assertIsNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.no_one)
-
- def test_moderator_approves(self):
- # The workflow runs until moderator approval is required, at which
- # point the workflow is saved. Once the moderator approves, the
- # workflow resumes and the user is unsubscribed.
- self._mlist.unsubscription_policy = SubscriptionPolicy.moderate
- workflow = UnSubscriptionWorkflow(
- self._mlist, self.anne, pre_confirmed=True)
- # Run the entire workflow.
- list(workflow)
- # The user is currently subscribed to the mailing list.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNotNone(member)
- self.assertIsNotNone(workflow.member)
- # The token is owned by the moderator.
- self.assertIsNotNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.moderator)
- # Create a new workflow with the previous workflow's save token, and
- # restore its state. This models an approved un-sunscription request
- # and should result in the user getting subscribed.
- approved_workflow = UnSubscriptionWorkflow(self._mlist)
- approved_workflow.token = workflow.token
- approved_workflow.restore()
- list(approved_workflow)
- # Now the user is unsubscribed from the mailing list.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNone(member)
- self.assertEqual(approved_workflow.member, member)
- # No further token is needed.
- self.assertIsNone(approved_workflow.token)
- self.assertEqual(approved_workflow.token_owner, TokenOwner.no_one)
-
- def test_get_moderator_approval_log_on_hold(self):
- # When the unsubscription is held for moderator approval, a message is
- # logged.
- mark = LogFileMark('mailman.subscribe')
- self._mlist.unsubscription_policy = SubscriptionPolicy.moderate
- workflow = UnSubscriptionWorkflow(
- self._mlist, self.anne, pre_confirmed=True)
- # Run the entire workflow.
- list(workflow)
- self.assertIn(
- 'test@example.com: held unsubscription request from anne@example.com',
- mark.readline()
- )
- # The state machine stopped at the moderator approval step so there
- # will be one token still in the database.
- self._expected_pendings_count = 1
-
- def test_get_moderator_approval_notifies_moderators(self):
- # When the unsubscription is held for moderator approval, and the list
- # is so configured, a notification is sent to the list moderators.
- self._mlist.admin_immed_notify = True
- self._mlist.unsubscription_policy = SubscriptionPolicy.moderate
- workflow = UnSubscriptionWorkflow(
- self._mlist, self.anne, pre_confirmed=True)
- # Consume the entire state machine.
- list(workflow)
- items = get_queue_messages('virgin', expected_count=1)
- message = items[0].msg
- self.assertEqual(message['From'], 'test-owner@example.com')
- self.assertEqual(message['To'], 'test-owner@example.com')
- self.assertEqual(
- message['Subject'],
- 'New unsubscription request to Test from anne@example.com')
- self.assertEqual(message.get_payload(), """\
-Your authorization is required for a mailing list unsubscription
-request approval:
-
- For: anne@example.com
- List: test@example.com
-""")
- # The state machine stopped at the moderator approval so there will be
- # one token still in the database.
- self._expected_pendings_count = 1
-
- def test_get_moderator_approval_no_notifications(self):
- # When the unsubscription request is held for moderator approval, and
- # the list is so configured, a notification is sent to the list
- # moderators.
- self._mlist.admin_immed_notify = False
- self._mlist.unsubscription_policy = SubscriptionPolicy.moderate
- workflow = UnSubscriptionWorkflow(
- self._mlist, self.anne, pre_confirmed=True)
- # Consume the entire state machine.
- list(workflow)
- get_queue_messages('virgin', expected_count=0)
- # The state machine stopped at the moderator approval so there will be
- # one token still in the database.
- self._expected_pendings_count = 1
-
- def test_send_confirmation(self):
- # A confirmation message gets sent when the unsubscription must be
- # confirmed.
- self._mlist.unsubscription_policy = SubscriptionPolicy.confirm
- # Run the workflow to model the confirmation step.
- workflow = UnSubscriptionWorkflow(self._mlist, self.anne)
- list(workflow)
- items = get_queue_messages('virgin', expected_count=1)
- message = items[0].msg
- token = workflow.token
- self.assertEqual(
- message['Subject'], 'confirm {}'.format(workflow.token))
- self.assertEqual(
- message['From'], 'test-confirm+{}@example.com'.format(token))
- # The state machine stopped at the member confirmation step so there
- # will be one token still in the database.
- self._expected_pendings_count = 1
-
- def test_do_confirmation_unsubscribes_user(self):
- # Unsubscriptions to the mailing list must be confirmed. Once that's
- # done, the user's address is unsubscribed.
- self._mlist.unsubscription_policy = SubscriptionPolicy.confirm
- workflow = UnSubscriptionWorkflow(self._mlist, self.anne)
- list(workflow)
- # Anne is a member.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNotNone(member)
- self.assertEqual(member, workflow.member)
- # The token is owned by the subscriber.
- self.assertIsNotNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.subscriber)
- # Confirm.
- confirm_workflow = UnSubscriptionWorkflow(self._mlist)
- confirm_workflow.token = workflow.token
- confirm_workflow.restore()
- list(confirm_workflow)
- # Anne is now unsubscribed.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNone(member)
- # No further token is needed.
- self.assertIsNone(confirm_workflow.token)
- self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one)
-
- def test_do_confirmation_unsubscribes_address(self):
- # Unsubscriptions to the mailing list must be confirmed. Once that's
- # done, the address is unsubscribed.
- address = self.anne.register('anne.person@example.com')
- self._mlist.subscribe(address)
- self._mlist.unsubscription_policy = SubscriptionPolicy.confirm
- workflow = UnSubscriptionWorkflow(self._mlist, address)
- list(workflow)
- # Bart is a member.
- member = self._mlist.regular_members.get_member(
- 'anne.person@example.com')
- self.assertIsNotNone(member)
- self.assertEqual(member, workflow.member)
- # The token is owned by the subscriber.
- self.assertIsNotNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.subscriber)
- # Confirm.
- confirm_workflow = UnSubscriptionWorkflow(self._mlist)
- confirm_workflow.token = workflow.token
- confirm_workflow.restore()
- list(confirm_workflow)
- # Bart is now unsubscribed.
- member = self._mlist.regular_members.get_member(
- 'anne.person@example.com')
- self.assertIsNone(member)
- # No further token is needed.
- self.assertIsNone(confirm_workflow.token)
- self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one)
-
- def test_do_confirmation_nonmember(self):
- # Attempt to confirm the unsubscription of a member who has already
- # been unsubscribed.
- self._mlist.unsubscription_policy = SubscriptionPolicy.confirm
- workflow = UnSubscriptionWorkflow(self._mlist, self.anne)
- list(workflow)
- # Anne is a member.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNotNone(member)
- self.assertEqual(member, workflow.member)
- # The token is owned by the subscriber.
- self.assertIsNotNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.subscriber)
- # Unsubscribe Anne out of band.
- member.unsubscribe()
- # Confirm.
- confirm_workflow = UnSubscriptionWorkflow(self._mlist)
- confirm_workflow.token = workflow.token
- confirm_workflow.restore()
- list(confirm_workflow)
- # No further token is needed.
- self.assertIsNone(confirm_workflow.token)
- self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one)
-
- def test_do_confirmation_nonmember_final_step(self):
- # Attempt to confirm the unsubscription of a member who has already
- # been unsubscribed.
- self._mlist.unsubscription_policy = SubscriptionPolicy.confirm
- workflow = UnSubscriptionWorkflow(self._mlist, self.anne)
- list(workflow)
- # Anne is a member.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNotNone(member)
- self.assertEqual(member, workflow.member)
- # The token is owned by the subscriber.
- self.assertIsNotNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.subscriber)
- # Confirm.
- confirm_workflow = UnSubscriptionWorkflow(self._mlist)
- confirm_workflow.token = workflow.token
- confirm_workflow.restore()
- confirm_workflow.run_until('do_unsubscription')
- self.assertEqual(member, confirm_workflow.member)
- # Unsubscribe Anne out of band.
- member.unsubscribe()
- list(confirm_workflow)
- self.assertIsNone(confirm_workflow.member)
- # No further token is needed.
- self.assertIsNone(confirm_workflow.token)
- self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one)
-
- def test_prevent_confirmation_replay_attacks(self):
- # Ensure that if the workflow requires two confirmations, e.g. first
- # the user confirming their subscription, and then the moderator
- # approving it, that different tokens are used in these two cases.
- self._mlist.unsubscription_policy = (
- SubscriptionPolicy.confirm_then_moderate)
- workflow = UnSubscriptionWorkflow(self._mlist, self.anne)
- # Run the state machine up to the first confirmation, and cache the
- # confirmation token.
- list(workflow)
- token = workflow.token
- # Anne is still a member of the mailing list.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNotNone(member)
- self.assertIsNotNone(workflow.member)
- # The token is owned by the subscriber.
- self.assertIsNotNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.subscriber)
- # The old token will not work for moderator approval.
- moderator_workflow = UnSubscriptionWorkflow(self._mlist)
- moderator_workflow.token = token
- moderator_workflow.restore()
- list(moderator_workflow)
- # The token is owned by the moderator.
- self.assertIsNotNone(moderator_workflow.token)
- self.assertEqual(moderator_workflow.token_owner, TokenOwner.moderator)
- # While we wait for the moderator to approve the subscription, note
- # that there's a new token for the next steps.
- self.assertNotEqual(token, moderator_workflow.token)
- # The old token won't work.
- final_workflow = UnSubscriptionWorkflow(self._mlist)
- final_workflow.token = token
- self.assertRaises(LookupError, final_workflow.restore)
- # Running this workflow will fail.
- self.assertRaises(AssertionError, list, final_workflow)
- # Anne is still not unsubscribed.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNotNone(member)
- self.assertIsNone(final_workflow.member)
- # However, if we use the new token, her unsubscription request will be
- # approved by the moderator.
- final_workflow.token = moderator_workflow.token
- final_workflow.restore()
- list(final_workflow)
- # And now Anne is unsubscribed.
- member = self._mlist.regular_members.get_member(self._anne)
- self.assertIsNone(member)
- # No further token is needed.
- self.assertIsNone(final_workflow.token)
- self.assertEqual(final_workflow.token_owner, TokenOwner.no_one)
-
- def test_confirmation_needed_and_pre_confirmed(self):
- # The subscription policy is 'confirm' but the subscription is
- # pre-confirmed so the moderation checks can be skipped.
- self._mlist.unsubscription_policy = SubscriptionPolicy.confirm
- workflow = UnSubscriptionWorkflow(
- self._mlist, self.anne, pre_confirmed=True, pre_approved=True)
- list(workflow)
- # Anne was unsubscribed.
- self.assertIsNone(workflow.token)
- self.assertEqual(workflow.token_owner, TokenOwner.no_one)
- self.assertIsNone(workflow.member)
-
- def test_confirmation_needed_moderator_address(self):
- address = self.anne.register('anne.person@example.com')
- self._mlist.subscribe(address)
- self._mlist.unsubscription_policy = SubscriptionPolicy.moderate
- workflow = UnSubscriptionWorkflow(self._mlist, address)
- # Get moderator approval.
- list(workflow)
- approved_workflow = UnSubscriptionWorkflow(self._mlist)
- approved_workflow.token = workflow.token
- approved_workflow.restore()
- list(approved_workflow)
- self.assertEqual(approved_workflow.subscriber, address)
- # Anne was unsubscribed.
- self.assertIsNone(approved_workflow.token)
- self.assertEqual(approved_workflow.token_owner, TokenOwner.no_one)
- self.assertIsNone(approved_workflow.member)
- member = self._mlist.regular_members.get_member(
- 'anne.person@example.com')
- self.assertIsNone(member)
diff --git a/src/mailman/app/tests/test_workflow.py b/src/mailman/app/tests/test_workflow.py
deleted file mode 100644
index e82001540..000000000
--- a/src/mailman/app/tests/test_workflow.py
+++ /dev/null
@@ -1,183 +0,0 @@
-# Copyright (C) 2015-2017 by the Free Software Foundation, Inc.
-#
-# This file is part of GNU Mailman.
-#
-# GNU Mailman is free software: you can redistribute it and/or modify it under
-# the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 3 of the License, or (at your option)
-# any later version.
-#
-# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
-# more details.
-#
-# You should have received a copy of the GNU General Public License along with
-# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
-
-"""App-level workflow tests."""
-
-import json
-import unittest
-
-from mailman.app.workflow import Workflow
-from mailman.interfaces.workflow import IWorkflowStateManager
-from mailman.testing.layers import ConfigLayer
-from zope.component import getUtility
-
-
-class MyWorkflow(Workflow):
- INITIAL_STATE = 'first'
- SAVE_ATTRIBUTES = ('ant', 'bee', 'cat')
-
- def __init__(self):
- super().__init__()
- self.token = 'test-workflow'
- self.ant = 1
- self.bee = 2
- self.cat = 3
- self.dog = 4
-
- def _step_first(self):
- self.push('second')
- return 'one'
-
- def _step_second(self):
- self.push('third')
- return 'two'
-
- def _step_third(self):
- return 'three'
-
-
-class DependentWorkflow(MyWorkflow):
- SAVE_ATTRIBUTES = ('ant', 'bee', 'cat', 'elf')
-
- def __init__(self):
- super().__init__()
- self._elf = 5
-
- @property
- def elf(self):
- return self._elf
-
- @elf.setter
- def elf(self, value):
- # This attribute depends on other attributes.
- assert self.ant is not None
- assert self.bee is not None
- assert self.cat is not None
- self._elf = value
-
-
-class TestWorkflow(unittest.TestCase):
- layer = ConfigLayer
-
- def setUp(self):
- self._workflow = iter(MyWorkflow())
-
- def test_basic_workflow(self):
- # The work flows from one state to the next.
- results = list(self._workflow)
- self.assertEqual(results, ['one', 'two', 'three'])
-
- def test_partial_workflow(self):
- # You don't have to flow through every step.
- results = next(self._workflow)
- self.assertEqual(results, 'one')
-
- def test_exhaust_workflow(self):
- # Manually flow through a few steps, then consume the whole thing.
- results = [next(self._workflow)]
- results.extend(self._workflow)
- self.assertEqual(results, ['one', 'two', 'three'])
-
- def test_save_and_restore_workflow(self):
- # Without running any steps, save and restore the workflow. Then
- # consume the restored workflow.
- self._workflow.save()
- new_workflow = MyWorkflow()
- new_workflow.restore()
- results = list(new_workflow)
- self.assertEqual(results, ['one', 'two', 'three'])
-
- def test_save_and_restore_partial_workflow(self):
- # After running a few steps, save and restore the workflow. Then
- # consume the restored workflow.
- next(self._workflow)
- self._workflow.save()
- new_workflow = MyWorkflow()
- new_workflow.restore()
- results = list(new_workflow)
- self.assertEqual(results, ['two', 'three'])
-
- def test_save_and_restore_exhausted_workflow(self):
- # After consuming the entire workflow, save and restore it.
- list(self._workflow)
- self._workflow.save()
- new_workflow = MyWorkflow()
- new_workflow.restore()
- results = list(new_workflow)
- self.assertEqual(len(results), 0)
-
- def test_save_and_restore_attributes(self):
- # Saved attributes are restored.
- self._workflow.ant = 9
- self._workflow.bee = 8
- self._workflow.cat = 7
- # Don't save .dog.
- self._workflow.save()
- new_workflow = MyWorkflow()
- new_workflow.restore()
- self.assertEqual(new_workflow.ant, 9)
- self.assertEqual(new_workflow.bee, 8)
- self.assertEqual(new_workflow.cat, 7)
- self.assertEqual(new_workflow.dog, 4)
-
- def test_save_and_restore_dependant_attributes(self):
- # Attributes must be restored in the order they are declared in
- # SAVE_ATTRIBUTES.
- workflow = iter(DependentWorkflow())
- workflow.elf = 6
- workflow.save()
- new_workflow = DependentWorkflow()
- # The elf attribute must be restored last, set triggering values for
- # attributes it depends on.
- new_workflow.ant = new_workflow.bee = new_workflow.cat = None
- new_workflow.restore()
- self.assertEqual(new_workflow.elf, 6)
-
- def test_save_and_restore_obsolete_attributes(self):
- # Obsolete saved attributes are ignored.
- state_manager = getUtility(IWorkflowStateManager)
- # Save the state of an old version of the workflow that would not have
- # the cat attribute.
- state_manager.save(
- self._workflow.token, 'first',
- json.dumps({'ant': 1, 'bee': 2}))
- # Restore in the current version that needs the cat attribute.
- new_workflow = MyWorkflow()
- try:
- new_workflow.restore()
- except KeyError:
- self.fail('Restore does not handle obsolete attributes')
- # Restoring must not raise an exception, the default value is kept.
- self.assertEqual(new_workflow.cat, 3)
-
- def test_run_thru(self):
- # Run all steps through the given one.
- results = self._workflow.run_thru('second')
- self.assertEqual(results, ['one', 'two'])
-
- def test_run_thru_completes(self):
- results = self._workflow.run_thru('all of them')
- self.assertEqual(results, ['one', 'two', 'three'])
-
- def test_run_until(self):
- # Run until (but not including) the given step.
- results = self._workflow.run_until('second')
- self.assertEqual(results, ['one'])
-
- def test_run_until_completes(self):
- results = self._workflow.run_until('all of them')
- self.assertEqual(results, ['one', 'two', 'three'])
diff --git a/src/mailman/app/tests/test_workflowmanager.py b/src/mailman/app/tests/test_workflowmanager.py
index 38ed2e468..e21e428f6 100644
--- a/src/mailman/app/tests/test_workflowmanager.py
+++ b/src/mailman/app/tests/test_workflowmanager.py
@@ -20,7 +20,6 @@
import unittest
from mailman.app.lifecycle import create_list
-from mailman.interfaces.mailinglist import SubscriptionPolicy
from mailman.interfaces.member import MemberRole
from mailman.interfaces.pending import IPendings
from mailman.interfaces.subscriptions import ISubscriptionManager, TokenOwner
@@ -28,6 +27,9 @@ from mailman.interfaces.usermanager import IUserManager
from mailman.testing.helpers import get_queue_messages
from mailman.testing.layers import ConfigLayer
from mailman.utilities.datetime import now
+from mailman.workflows.subscription import (
+ ConfirmModerationSubscriptionPolicy, ConfirmSubscriptionPolicy,
+ ModerationSubscriptionPolicy, OpenSubscriptionPolicy)
from zope.component import getUtility
@@ -60,7 +62,7 @@ class TestRegistrar(unittest.TestCase):
# Registering a subscription request where no confirmation or
# moderation steps are needed, leaves us with no token or owner, since
# there's nothing more to do.
- self._mlist.subscription_policy = SubscriptionPolicy.open
+ self._mlist.subscription_policy = OpenSubscriptionPolicy
self._anne.verified_on = now()
token, token_owner, rmember = self._registrar.register(self._anne)
self.assertIsNone(token)
@@ -82,7 +84,7 @@ class TestRegistrar(unittest.TestCase):
# (because she does not have a verified address), but not the moderator
# to approve. Running the workflow gives us a token. Confirming the
# token subscribes the user.
- self._mlist.subscription_policy = SubscriptionPolicy.open
+ self._mlist.subscription_policy = OpenSubscriptionPolicy
token, token_owner, rmember = self._registrar.register(self._anne)
self.assertIsNotNone(token)
self.assertEqual(token_owner, TokenOwner.subscriber)
@@ -102,7 +104,7 @@ class TestRegistrar(unittest.TestCase):
# (because of list policy), but not the moderator to approve. Running
# the workflow gives us a token. Confirming the token subscribes the
# user.
- self._mlist.subscription_policy = SubscriptionPolicy.confirm
+ self._mlist.subscription_policy = ConfirmSubscriptionPolicy
self._anne.verified_on = now()
token, token_owner, rmember = self._registrar.register(self._anne)
self.assertIsNotNone(token)
@@ -122,7 +124,7 @@ class TestRegistrar(unittest.TestCase):
# We have a subscription request which requires the moderator to
# approve. Running the workflow gives us a token. Confirming the
# token subscribes the user.
- self._mlist.subscription_policy = SubscriptionPolicy.moderate
+ self._mlist.subscription_policy = ModerationSubscriptionPolicy
self._anne.verified_on = now()
token, token_owner, rmember = self._registrar.register(self._anne)
self.assertIsNotNone(token)
@@ -145,7 +147,7 @@ class TestRegistrar(unittest.TestCase):
# token runs the workflow a little farther, but still gives us a
# token. Confirming again subscribes the user.
self._mlist.subscription_policy = (
- SubscriptionPolicy.confirm_then_moderate)
+ ConfirmModerationSubscriptionPolicy)
self._anne.verified_on = now()
# Runs until subscription confirmation.
token, token_owner, rmember = self._registrar.register(self._anne)
@@ -180,7 +182,7 @@ class TestRegistrar(unittest.TestCase):
# sees when they approve the subscription. This prevents the user
# from using a replay attack to subvert moderator approval.
self._mlist.subscription_policy = (
- SubscriptionPolicy.confirm_then_moderate)
+ ConfirmModerationSubscriptionPolicy)
self._anne.verified_on = now()
# Runs until subscription confirmation.
token, token_owner, rmember = self._registrar.register(self._anne)
@@ -216,7 +218,7 @@ class TestRegistrar(unittest.TestCase):
def test_discard_waiting_for_confirmation(self):
# While waiting for a user to confirm their subscription, we discard
# the workflow.
- self._mlist.subscription_policy = SubscriptionPolicy.confirm
+ self._mlist.subscription_policy = ConfirmSubscriptionPolicy
self._anne.verified_on = now()
# Runs until subscription confirmation.
token, token_owner, rmember = self._registrar.register(self._anne)
@@ -233,7 +235,7 @@ class TestRegistrar(unittest.TestCase):
def test_admin_notify_mchanges(self):
# When a user gets subscribed via the subscription policy workflow,
# the list administrators get an email notification.
- self._mlist.subscription_policy = SubscriptionPolicy.open
+ self._mlist.subscription_policy = OpenSubscriptionPolicy
self._mlist.admin_notify_mchanges = True
self._mlist.send_welcome_message = False
token, token_owner, member = self._registrar.register(
@@ -253,7 +255,7 @@ anne@example.com has been successfully subscribed to Ant.
# Even when a user gets subscribed via the subscription policy
# workflow, the list administrators won't get an email notification if
# they don't want one.
- self._mlist.subscription_policy = SubscriptionPolicy.open
+ self._mlist.subscription_policy = OpenSubscriptionPolicy
self._mlist.admin_notify_mchanges = False
self._mlist.send_welcome_message = False
# Bart is an administrator of the mailing list.
diff --git a/src/mailman/app/workflow.py b/src/mailman/app/workflow.py
deleted file mode 100644
index 92b78c184..000000000
--- a/src/mailman/app/workflow.py
+++ /dev/null
@@ -1,151 +0,0 @@
-# Copyright (C) 2015-2017 by the Free Software Foundation, Inc.
-#
-# This file is part of GNU Mailman.
-#
-# GNU Mailman is free software: you can redistribute it and/or modify it under
-# the terms of the GNU General Public License as published by the Free
-# Software Foundation, either version 3 of the License, or (at your option)
-# any later version.
-#
-# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
-# more details.
-#
-# You should have received a copy of the GNU General Public License along with
-# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
-
-"""Generic workflow."""
-
-import sys
-import json
-import logging
-
-from collections import deque
-from mailman.interfaces.workflow import IWorkflowStateManager
-from public import public
-from zope.component import getUtility
-
-
-COMMASPACE = ', '
-log = logging.getLogger('mailman.error')
-
-
-@public
-class Workflow:
- """Generic workflow."""
-
- SAVE_ATTRIBUTES = ()
- INITIAL_STATE = None
-
- def __init__(self):
- self.token = None
- self._next = deque()
- self.push(self.INITIAL_STATE)
- self.debug = False
- self._count = 0
-
- @property
- def name(self):
- return self.__class__.__name__
-
- def __iter__(self):
- return self
-
- def push(self, step):
- self._next.append(step)
-
- def _pop(self):
- name = self._next.popleft()
- step = getattr(self, '_step_{}'.format(name))
- self._count += 1
- if self.debug: # pragma: nocover
- print('[{:02d}] -> {}'.format(self._count, name), file=sys.stderr)
- return name, step
-
- def __next__(self):
- try:
- name, step = self._pop()
- return step()
- except IndexError:
- raise StopIteration
- except:
- log.exception('deque: {}'.format(COMMASPACE.join(self._next)))
- raise
-
- def run_thru(self, stop_after):
- """Run the state machine through and including the given step.
-
- :param stop_after: Name of method, sans prefix to run the
- state machine through. In other words, the state machine runs
- until the named method completes.
- """
- results = []
- while True:
- try:
- name, step = self._pop()
- except (StopIteration, IndexError):
- # We're done.
- break
- results.append(step())
- if name == stop_after:
- break
- return results
-
- def run_until(self, stop_before):
- """Trun the state machine until (not including) the given step.
-
- :param stop_before: Name of method, sans prefix that the
- state machine is run until the method is reached. Unlike
- `run_thru()` the named method is not run.
- """
- results = []
- while True:
- try:
- name, step = self._pop()
- except (StopIteration, IndexError):
- # We're done.
- break
- if name == stop_before:
- # Stop executing, but not before we push the last state back
- # onto the deque. Otherwise, resuming the state machine would
- # skip this step.
- self._next.appendleft(name)
- break
- results.append(step())
- return results
-
- def save(self):
- assert self.token, 'Workflow token must be set'
- state_manager = getUtility(IWorkflowStateManager)
- data = {attr: getattr(self, attr) for attr in self.SAVE_ATTRIBUTES}
- # Note: only the next step is saved, not the whole stack. This is not
- # an issue in practice, since there's never more than a single step in
- # the queue anyway. If we want to support more than a single step in
- # the queue *and* want to support state saving/restoring, change this
- # method and the restore() method.
- if len(self._next) == 0:
- step = None
- elif len(self._next) == 1:
- step = self._next[0]
- else:
- raise AssertionError(
- "Can't save a workflow state with more than one step "
- "in the queue")
- state_manager.save(self.token, step, json.dumps(data))
-
- def restore(self):
- state_manager = getUtility(IWorkflowStateManager)
- state = state_manager.restore(self.token)
- if state is None:
- # The token doesn't exist in the database.
- raise LookupError(self.token)
- self._next.clear()
- if state.step:
- self._next.append(state.step)
- data = json.loads(state.data)
- for attr in self.SAVE_ATTRIBUTES:
- try:
- setattr(self, attr, data[attr])
- except KeyError:
- pass