summaryrefslogtreecommitdiff
path: root/src/mailman/app/tests/test_subscriptions.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman/app/tests/test_subscriptions.py')
-rw-r--r--src/mailman/app/tests/test_subscriptions.py517
1 files changed, 515 insertions, 2 deletions
diff --git a/src/mailman/app/tests/test_subscriptions.py b/src/mailman/app/tests/test_subscriptions.py
index 8ba5f52ff..a4971d793 100644
--- a/src/mailman/app/tests/test_subscriptions.py
+++ b/src/mailman/app/tests/test_subscriptions.py
@@ -18,7 +18,8 @@
"""Tests for the subscription service."""
__all__ = [
- 'TestJoin'
+ 'TestJoin',
+ 'TestSubscriptionWorkflow',
]
@@ -26,11 +27,20 @@ import uuid
import unittest
from mailman.app.lifecycle import create_list
+from mailman.app.subscriptions import SubscriptionWorkflow
from mailman.interfaces.address import InvalidEmailAddressError
-from mailman.interfaces.member import MemberRole, MissingPreferredAddressError
+from mailman.interfaces.bans import IBanManager
+from mailman.interfaces.member import (
+ MemberRole, MembershipIsBannedError, MissingPreferredAddressError)
+from mailman.interfaces.pending import IPendings
from mailman.interfaces.subscriptions import (
MissingUserError, ISubscriptionService)
+from mailman.testing.helpers import LogFileMark, get_queue_messages
from mailman.testing.layers import ConfigLayer
+from mailman.interfaces.mailinglist import SubscriptionPolicy
+from mailman.interfaces.usermanager import IUserManager
+from mailman.utilities.datetime import now
+from unittest.mock import patch
from zope.component import getUtility
@@ -65,3 +75,506 @@ class TestJoin(unittest.TestCase):
self._service.join,
'test.example.com', anne.user.user_id,
role=MemberRole.owner)
+
+
+
+class TestSubscriptionWorkflow(unittest.TestCase):
+ layer = ConfigLayer
+ maxDiff = None
+
+ def setUp(self):
+ self._mlist = create_list('test@example.com')
+ self._mlist.admin_immed_notify = False
+ self._anne = 'anne@example.com'
+ self._user_manager = getUtility(IUserManager)
+
+ def test_user_or_address_required(self):
+ # The `subscriber` attribute must be a user or address.
+ workflow = SubscriptionWorkflow(self._mlist)
+ self.assertRaises(AssertionError, list, workflow)
+
+ def test_sanity_checks_address(self):
+ # Ensure that the sanity check phase, when given an IAddress, ends up
+ # with a linked user.
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne)
+ self.assertIsNotNone(workflow.address)
+ self.assertIsNone(workflow.user)
+ workflow.run_thru('sanity_checks')
+ self.assertIsNotNone(workflow.address)
+ self.assertIsNotNone(workflow.user)
+ self.assertEqual(list(workflow.user.addresses)[0].email, self._anne)
+
+ def test_sanity_checks_user_with_preferred_address(self):
+ # Ensure that the sanity check phase, when given an IUser with a
+ # preferred address, ends up with an address.
+ anne = self._user_manager.make_user(self._anne)
+ address = list(anne.addresses)[0]
+ address.verified_on = now()
+ anne.preferred_address = address
+ workflow = SubscriptionWorkflow(self._mlist, anne)
+ # The constructor sets workflow.address because the user has a
+ # preferred address.
+ self.assertEqual(workflow.address, address)
+ self.assertEqual(workflow.user, anne)
+ workflow.run_thru('sanity_checks')
+ self.assertEqual(workflow.address, address)
+ self.assertEqual(workflow.user, anne)
+
+ def test_sanity_checks_user_without_preferred_address(self):
+ # Ensure that the sanity check phase, when given a user without a
+ # preferred address, but with at least one linked address, gets an
+ # address.
+ anne = self._user_manager.make_user(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne)
+ self.assertIsNone(workflow.address)
+ self.assertEqual(workflow.user, anne)
+ workflow.run_thru('sanity_checks')
+ self.assertIsNotNone(workflow.address)
+ self.assertEqual(workflow.user, anne)
+
+ def test_sanity_checks_user_with_multiple_linked_addresses(self):
+ # Ensure that the santiy check phase, when given a user without a
+ # preferred address, but with multiple linked addresses, gets of of
+ # those addresses (exactly which one is undefined).
+ anne = self._user_manager.make_user(self._anne)
+ anne.link(self._user_manager.create_address('anne@example.net'))
+ anne.link(self._user_manager.create_address('anne@example.org'))
+ workflow = SubscriptionWorkflow(self._mlist, anne)
+ self.assertIsNone(workflow.address)
+ self.assertEqual(workflow.user, anne)
+ workflow.run_thru('sanity_checks')
+ self.assertIn(workflow.address.email, ['anne@example.com',
+ 'anne@example.net',
+ 'anne@example.org'])
+ self.assertEqual(workflow.user, anne)
+
+ def test_sanity_checks_user_without_addresses(self):
+ # It is an error to try to subscribe a user with no linked addresses.
+ user = self._user_manager.create_user()
+ workflow = SubscriptionWorkflow(self._mlist, user)
+ self.assertRaises(AssertionError, workflow.run_thru, 'sanity_checks')
+
+ def test_sanity_checks_globally_banned_address(self):
+ # An exception is raised if the address is globally banned.
+ anne = self._user_manager.create_address(self._anne)
+ IBanManager(None).ban(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne)
+ self.assertRaises(MembershipIsBannedError, list, workflow)
+
+ def test_sanity_checks_banned_address(self):
+ # An exception is raised if the address is banned by the mailing list.
+ anne = self._user_manager.create_address(self._anne)
+ IBanManager(self._mlist).ban(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne)
+ self.assertRaises(MembershipIsBannedError, list, workflow)
+
+ def test_verification_checks_with_verified_address(self):
+ # When the address is already verified, we skip straight to the
+ # confirmation checks.
+ anne = self._user_manager.create_address(self._anne)
+ anne.verified_on = now()
+ workflow = SubscriptionWorkflow(self._mlist, anne)
+ workflow.run_thru('verification_checks')
+ with patch.object(workflow, '_step_confirmation_checks') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
+ def test_verification_checks_with_pre_verified_address(self):
+ # When the address is not yet verified, but the pre-verified flag is
+ # passed to the workflow, we skip to the confirmation checks.
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
+ workflow.run_thru('verification_checks')
+ with patch.object(workflow, '_step_confirmation_checks') as step:
+ next(workflow)
+ step.assert_called_once_with()
+ # And now the address is verified.
+ self.assertIsNotNone(anne.verified_on)
+
+ def test_verification_checks_confirmation_needed(self):
+ # The address is neither verified, nor is the pre-verified flag set.
+ # A confirmation message must be sent to the user which will also
+ # verify their address.
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne)
+ workflow.run_thru('verification_checks')
+ with patch.object(workflow, '_step_send_confirmation') as step:
+ next(workflow)
+ step.assert_called_once_with()
+ # The address still hasn't been verified.
+ self.assertIsNone(anne.verified_on)
+
+ def test_confirmation_checks_open_list(self):
+ # A subscription to an open list does not need to be confirmed or
+ # moderated.
+ self._mlist.subscription_policy = SubscriptionPolicy.open
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
+ workflow.run_thru('confirmation_checks')
+ with patch.object(workflow, '_step_do_subscription') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
+ def test_confirmation_checks_no_user_confirmation_needed(self):
+ # A subscription to a list which does not need user confirmation skips
+ # to the moderation checks.
+ self._mlist.subscription_policy = SubscriptionPolicy.moderate
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
+ workflow.run_thru('confirmation_checks')
+ with patch.object(workflow, '_step_moderation_checks') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
+ def test_confirmation_checks_confirm_pre_confirmed(self):
+ # The subscription policy requires user confirmation, but their
+ # subscription is pre-confirmed.
+ self._mlist.subscription_policy = SubscriptionPolicy.confirm
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne,
+ pre_verified=True,
+ pre_confirmed=True)
+ workflow.run_thru('confirmation_checks')
+ with patch.object(workflow, '_step_moderation_checks') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
+ def test_confirmation_checks_confirm_and_moderate_pre_confirmed(self):
+ # The subscription policy requires user confirmation and moderation,
+ # but their subscription is pre-confirmed.
+ self._mlist.subscription_policy = \
+ SubscriptionPolicy.confirm_then_moderate
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne,
+ pre_verified=True,
+ pre_confirmed=True)
+ workflow.run_thru('confirmation_checks')
+ with patch.object(workflow, '_step_moderation_checks') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
+ def test_confirmation_checks_confirmation_needed(self):
+ # The subscription policy requires confirmation and the subscription
+ # is not pre-confirmed.
+ self._mlist.subscription_policy = SubscriptionPolicy.confirm
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
+ workflow.run_thru('confirmation_checks')
+ with patch.object(workflow, '_step_send_confirmation') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
+ def test_confirmation_checks_moderate_confirmation_needed(self):
+ # The subscription policy requires confirmation and moderation, and the
+ # subscription is not pre-confirmed.
+ self._mlist.subscription_policy = \
+ SubscriptionPolicy.confirm_then_moderate
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
+ workflow.run_thru('confirmation_checks')
+ with patch.object(workflow, '_step_send_confirmation') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
+ def test_moderation_checks_pre_approved(self):
+ # The subscription is pre-approved by the moderator.
+ self._mlist.subscription_policy = SubscriptionPolicy.moderate
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne,
+ pre_verified=True,
+ pre_approved=True)
+ workflow.run_thru('moderation_checks')
+ with patch.object(workflow, '_step_do_subscription') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
+ def test_moderation_checks_approval_required(self):
+ # The moderator must approve the subscription.
+ self._mlist.subscription_policy = SubscriptionPolicy.moderate
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
+ workflow.run_thru('moderation_checks')
+ with patch.object(workflow, '_step_get_moderator_approval') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
+ def test_do_subscription(self):
+ # An open subscription policy plus a pre-verified address means the
+ # user gets subscribed to the mailing list without any further
+ # confirmations or approvals.
+ self._mlist.subscription_policy = SubscriptionPolicy.open
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
+ # Consume the entire state machine.
+ list(workflow)
+ # Anne is now a member of the mailing list.
+ member = self._mlist.regular_members.get_member(self._anne)
+ self.assertEqual(member.address, anne)
+
+ def test_do_subscription_pre_approved(self):
+ # An moderation-requiring subscription policy plus a pre-verified and
+ # pre-approved address means the user gets subscribed to the mailing
+ # list without any further confirmations or approvals.
+ self._mlist.subscription_policy = SubscriptionPolicy.moderate
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne,
+ pre_verified=True,
+ pre_approved=True)
+ # Consume the entire state machine.
+ list(workflow)
+ # Anne is now a member of the mailing list.
+ member = self._mlist.regular_members.get_member(self._anne)
+ self.assertEqual(member.address, anne)
+
+ def test_do_subscription_pre_approved_pre_confirmed(self):
+ # An moderation-requiring subscription policy plus a pre-verified and
+ # pre-approved address means the user gets subscribed to the mailing
+ # list without any further confirmations or approvals.
+ self._mlist.subscription_policy = \
+ SubscriptionPolicy.confirm_then_moderate
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne,
+ pre_verified=True,
+ pre_confirmed=True,
+ pre_approved=True)
+ # Consume the entire state machine.
+ list(workflow)
+ # Anne is now a member of the mailing list.
+ member = self._mlist.regular_members.get_member(self._anne)
+ self.assertEqual(member.address, anne)
+
+ def test_do_subscription_cleanups(self):
+ # Once the user is subscribed, the token, and its associated pending
+ # database record will be removed from the database.
+ self._mlist.subscription_policy = SubscriptionPolicy.open
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne,
+ pre_verified=True,
+ pre_confirmed=True,
+ pre_approved=True)
+ # Cache the token.
+ token = workflow.token
+ # Consume the entire state machine.
+ list(workflow)
+ # Anne is now a member of the mailing list.
+ member = self._mlist.regular_members.get_member(self._anne)
+ self.assertEqual(member.address, anne)
+ # The workflow is done, so it has no token.
+ self.assertIsNone(workflow.token)
+ # The pendable associated with the token has been evicted.
+ self.assertIsNone(getUtility(IPendings).confirm(token, expunge=False))
+ # There is no saved workflow associated with the token. This shows up
+ # as an exception when we try to restore the workflow.
+ new_workflow = SubscriptionWorkflow(self._mlist)
+ new_workflow.token = token
+ self.assertRaises(LookupError, new_workflow.restore)
+
+ def test_moderator_approves(self):
+ # The workflow runs until moderator approval is required, at which
+ # point the workflow is saved. Once the moderator approves, the
+ # workflow resumes and the user is subscribed.
+ self._mlist.subscription_policy = SubscriptionPolicy.moderate
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne,
+ pre_verified=True,
+ pre_confirmed=True)
+ # Consume the entire state machine.
+ list(workflow)
+ # The user is not currently subscribed to the mailing list.
+ member = self._mlist.regular_members.get_member(self._anne)
+ self.assertIsNone(member)
+ # Create a new workflow with the previous workflow's save token, and
+ # restore its state. This models an approved subscription and should
+ # result in the user getting subscribed.
+ approved_workflow = SubscriptionWorkflow(self._mlist)
+ approved_workflow.token = workflow.token
+ approved_workflow.restore()
+ list(approved_workflow)
+ # Now the user is subscribed to the mailing list.
+ member = self._mlist.regular_members.get_member(self._anne)
+ self.assertEqual(member.address, anne)
+
+ def test_get_moderator_approval_log_on_hold(self):
+ # When the subscription is held for moderator approval, a message is
+ # logged.
+ mark = LogFileMark('mailman.subscribe')
+ self._mlist.subscription_policy = SubscriptionPolicy.moderate
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne,
+ pre_verified=True,
+ pre_confirmed=True)
+ # Consume the entire state machine.
+ list(workflow)
+ line = mark.readline()
+ self.assertEqual(
+ line[29:-1],
+ 'test@example.com: held subscription request from anne@example.com'
+ )
+
+ def test_get_moderator_approval_notifies_moderators(self):
+ # When the subscription is held for moderator approval, and the list
+ # is so configured, a notification is sent to the list moderators.
+ self._mlist.admin_immed_notify = True
+ self._mlist.subscription_policy = SubscriptionPolicy.moderate
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne,
+ pre_verified=True,
+ pre_confirmed=True)
+ # Consume the entire state machine.
+ list(workflow)
+ items = get_queue_messages('virgin')
+ self.assertEqual(len(items), 1)
+ message = items[0].msg
+ self.assertEqual(message['From'], 'test-owner@example.com')
+ self.assertEqual(message['To'], 'test-owner@example.com')
+ self.assertEqual(
+ message['Subject'],
+ 'New subscription request to Test from anne@example.com')
+ self.assertEqual(message.get_payload(), """\
+Your authorization is required for a mailing list subscription request
+approval:
+
+ For: anne@example.com
+ List: test@example.com""")
+
+ def test_get_moderator_approval_no_notifications(self):
+ # When the subscription is held for moderator approval, and the list
+ # is so configured, a notification is sent to the list moderators.
+ self._mlist.admin_immed_notify = False
+ self._mlist.subscription_policy = SubscriptionPolicy.moderate
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne,
+ pre_verified=True,
+ pre_confirmed=True)
+ # Consume the entire state machine.
+ list(workflow)
+ items = get_queue_messages('virgin')
+ self.assertEqual(len(items), 0)
+
+ def test_send_confirmation(self):
+ # A confirmation message gets sent when the address is not verified.
+ anne = self._user_manager.create_address(self._anne)
+ self.assertIsNone(anne.verified_on)
+ # Run the workflow to model the confirmation step.
+ workflow = SubscriptionWorkflow(self._mlist, anne)
+ list(workflow)
+ items = get_queue_messages('virgin')
+ self.assertEqual(len(items), 1)
+ message = items[0].msg
+ token = workflow.token
+ self.assertEqual(message['Subject'], 'confirm {}'.format(token))
+ self.assertEqual(
+ message['From'], 'test-confirm+{}@example.com'.format(token))
+
+ def test_send_confirmation_pre_confirmed(self):
+ # A confirmation message gets sent when the address is not verified
+ # but the subscription is pre-confirmed.
+ anne = self._user_manager.create_address(self._anne)
+ self.assertIsNone(anne.verified_on)
+ # Run the workflow to model the confirmation step.
+ workflow = SubscriptionWorkflow(self._mlist, anne, pre_confirmed=True)
+ list(workflow)
+ items = get_queue_messages('virgin')
+ self.assertEqual(len(items), 1)
+ message = items[0].msg
+ token = workflow.token
+ self.assertEqual(
+ message['Subject'], 'confirm {}'.format(workflow.token))
+ self.assertEqual(
+ message['From'], 'test-confirm+{}@example.com'.format(token))
+
+ def test_send_confirmation_pre_verified(self):
+ # A confirmation message gets sent even when the address is verified
+ # when the subscription must be confirmed.
+ self._mlist.subscription_policy = SubscriptionPolicy.confirm
+ anne = self._user_manager.create_address(self._anne)
+ self.assertIsNone(anne.verified_on)
+ # Run the workflow to model the confirmation step.
+ workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
+ list(workflow)
+ items = get_queue_messages('virgin')
+ self.assertEqual(len(items), 1)
+ message = items[0].msg
+ token = workflow.token
+ self.assertEqual(
+ message['Subject'], 'confirm {}'.format(workflow.token))
+ self.assertEqual(
+ message['From'], 'test-confirm+{}@example.com'.format(token))
+
+ def test_do_confirm_verify_address(self):
+ # The address is not yet verified, nor are we pre-verifying. A
+ # confirmation message will be sent. When the user confirms their
+ # subscription request, the address will end up being verified.
+ anne = self._user_manager.create_address(self._anne)
+ self.assertIsNone(anne.verified_on)
+ # Run the workflow to model the confirmation step.
+ workflow = SubscriptionWorkflow(self._mlist, anne)
+ list(workflow)
+ # The address is still not verified.
+ self.assertIsNone(anne.verified_on)
+ confirm_workflow = SubscriptionWorkflow(self._mlist)
+ confirm_workflow.token = workflow.token
+ confirm_workflow.restore()
+ confirm_workflow.run_thru('do_confirm_verify')
+ # The address is now verified.
+ self.assertIsNotNone(anne.verified_on)
+
+ def test_do_confirmation_subscribes_user(self):
+ # Subscriptions to the mailing list must be confirmed. Once that's
+ # done, the user's address (which is not initially verified) gets
+ # subscribed to the mailing list.
+ self._mlist.subscription_policy = SubscriptionPolicy.confirm
+ anne = self._user_manager.create_address(self._anne)
+ self.assertIsNone(anne.verified_on)
+ workflow = SubscriptionWorkflow(self._mlist, anne)
+ list(workflow)
+ self.assertIsNone(self._mlist.regular_members.get_member(self._anne))
+ confirm_workflow = SubscriptionWorkflow(self._mlist)
+ confirm_workflow.token = workflow.token
+ confirm_workflow.restore()
+ list(confirm_workflow)
+ self.assertIsNotNone(anne.verified_on)
+ self.assertEqual(
+ self._mlist.regular_members.get_member(self._anne).address, anne)
+
+ def test_prevent_confirmation_replay_attacks(self):
+ # Ensure that if the workflow requires two confirmations, e.g. first
+ # the user confirming their subscription, and then the moderator
+ # approving it, that different tokens are used in these two cases.
+ self._mlist.subscription_policy = \
+ SubscriptionPolicy.confirm_then_moderate
+ anne = self._user_manager.create_address(self._anne)
+ workflow = SubscriptionWorkflow(self._mlist, anne, pre_verified=True)
+ # Run the state machine up to the first confirmation, and cache the
+ # confirmation token.
+ list(workflow)
+ token = workflow.token
+ # Anne is not yet a member of the mailing list.
+ member = self._mlist.regular_members.get_member(self._anne)
+ self.assertIsNone(member)
+ # The old token will not work for moderator approval.
+ moderator_workflow = SubscriptionWorkflow(self._mlist)
+ moderator_workflow.token = token
+ moderator_workflow.restore()
+ list(moderator_workflow)
+ # While we wait for the moderator to approve the subscription, note
+ # that there's a new token for the next steps.
+ self.assertNotEqual(token, moderator_workflow.token)
+ # The old token won't work.
+ final_workflow = SubscriptionWorkflow(self._mlist)
+ final_workflow.token = token
+ self.assertRaises(LookupError, final_workflow.restore)
+ # Running this workflow will fail.
+ self.assertRaises(AssertionError, list, final_workflow)
+ # Anne is still not subscribed.
+ member = self._mlist.regular_members.get_member(self._anne)
+ self.assertIsNone(member)
+ # However, if we use the new token, her subscription request will be
+ # approved by the moderator.
+ final_workflow.token = moderator_workflow.token
+ final_workflow.restore()
+ list(final_workflow)
+ # And now Anne is a member.
+ member = self._mlist.regular_members.get_member(self._anne)
+ self.assertEqual(member.address.email, self._anne)