From 8b09113eb40c39ada3dc902cb4e869c8f012c97d Mon Sep 17 00:00:00 2001 From: J08nY Date: Thu, 29 Jun 2017 23:51:47 +0200 Subject: Create mailman.workflows package. Move base Workflow there. - Also introduce IWorkflow, ISubscriptionWorkflow, IUnsubscriptionWorkflow. --- src/mailman/app/tests/test_workflow.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'src/mailman/app/tests/test_workflow.py') diff --git a/src/mailman/app/tests/test_workflow.py b/src/mailman/app/tests/test_workflow.py index e82001540..f242a73ec 100644 --- a/src/mailman/app/tests/test_workflow.py +++ b/src/mailman/app/tests/test_workflow.py @@ -20,15 +20,15 @@ import json import unittest -from mailman.app.workflow import Workflow -from mailman.interfaces.workflow import IWorkflowStateManager +from mailman.interfaces.workflows import IWorkflowStateManager from mailman.testing.layers import ConfigLayer +from mailman.workflows.base import Workflow from zope.component import getUtility class MyWorkflow(Workflow): - INITIAL_STATE = 'first' - SAVE_ATTRIBUTES = ('ant', 'bee', 'cat') + initial_state = 'first' + save_attributes = ('ant', 'bee', 'cat') def __init__(self): super().__init__() @@ -51,7 +51,7 @@ class MyWorkflow(Workflow): class DependentWorkflow(MyWorkflow): - SAVE_ATTRIBUTES = ('ant', 'bee', 'cat', 'elf') + save_attributes = ('ant', 'bee', 'cat', 'elf') def __init__(self): super().__init__() @@ -136,7 +136,7 @@ class TestWorkflow(unittest.TestCase): def test_save_and_restore_dependant_attributes(self): # Attributes must be restored in the order they are declared in - # SAVE_ATTRIBUTES. + # save_attributes. workflow = iter(DependentWorkflow()) workflow.elf = 6 workflow.save() -- cgit v1.2.3-70-g09d2 From 505f47025ca3fab68a489238837c1a84e4b2a1b6 Mon Sep 17 00:00:00 2001 From: J08nY Date: Tue, 4 Jul 2017 17:54:47 +0200 Subject: Save the complete workflow stack, not only last step. --- src/mailman/app/tests/test_workflow.py | 2 +- .../versions/7c5b39d1ecc4_workflow_steps.py | 90 ++++++++++++++++++++ src/mailman/database/tests/test_migrations.py | 99 ++++++++++++++++++++++ src/mailman/interfaces/workflows.py | 8 +- src/mailman/model/tests/test_workflow.py | 22 ++--- src/mailman/model/workflows.py | 6 +- src/mailman/workflows/base.py | 22 ++--- 7 files changed, 215 insertions(+), 34 deletions(-) create mode 100644 src/mailman/database/alembic/versions/7c5b39d1ecc4_workflow_steps.py (limited to 'src/mailman/app/tests/test_workflow.py') diff --git a/src/mailman/app/tests/test_workflow.py b/src/mailman/app/tests/test_workflow.py index f242a73ec..3e7856b29 100644 --- a/src/mailman/app/tests/test_workflow.py +++ b/src/mailman/app/tests/test_workflow.py @@ -153,7 +153,7 @@ class TestWorkflow(unittest.TestCase): # Save the state of an old version of the workflow that would not have # the cat attribute. state_manager.save( - self._workflow.token, 'first', + self._workflow.token, '["first"]', json.dumps({'ant': 1, 'bee': 2})) # Restore in the current version that needs the cat attribute. new_workflow = MyWorkflow() diff --git a/src/mailman/database/alembic/versions/7c5b39d1ecc4_workflow_steps.py b/src/mailman/database/alembic/versions/7c5b39d1ecc4_workflow_steps.py new file mode 100644 index 000000000..c2daffd63 --- /dev/null +++ b/src/mailman/database/alembic/versions/7c5b39d1ecc4_workflow_steps.py @@ -0,0 +1,90 @@ +# Copyright (C) 2015-2017 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see . + +"""Save whole workflow stack in workflowstate. + +Revision ID: 7c5b39d1ecc4 +Revises: 4bd95c99b2e7 +Create Date: 2017-07-04 16:45:36.470746 + +""" + +import json +import sqlalchemy as sa + +from alembic import op +from mailman.database.types import SAUnicode + +# revision identifiers, used by Alembic. +revision = '7c5b39d1ecc4' +down_revision = '4bd95c99b2e7' + +old_state_table = sa.sql.table( + 'workflowstate', + sa.sql.column('token', SAUnicode), + sa.sql.column('step', SAUnicode), + sa.sql.column('data', SAUnicode) + ) +new_state_table = sa.sql.table( + 'workflowstate', + sa.sql.column('token', SAUnicode), + sa.sql.column('steps', SAUnicode), + sa.sql.column('data', SAUnicode) + ) + + +def upgrade(): + # Rename the column + with op.batch_alter_table('workflowstate') as batch_op: + batch_op.alter_column('step', new_column_name='steps', + existing_type=SAUnicode) + + connection = op.get_bind() + for token, step, data in connection.execute( + new_state_table.select()).fetchall(): + # Wrap the single step in a JSON array. + if step is not None: + new_steps = json.dumps([step]) + else: + new_steps = '[]' + connection.execute( + new_state_table.update().where( + new_state_table.c.token == token).values( + steps=new_steps) + ) + + +def downgrade(): + # Rename back the column + connection = op.get_bind() + with op.batch_alter_table('workflowstate') as batch_op: + batch_op.alter_column('steps', new_column_name='step', + existing_type=SAUnicode) + + for token, steps, data in connection.execute( + old_state_table.select()).fetchall(): + # Extract and save at least the last state. + step_stack = json.loads(steps) + if len(step_stack) == 0: + new_step = None + else: + new_step = step_stack.pop() + connection.execute( + old_state_table.update().where( + old_state_table.c.token == token).values( + step=new_step) + ) diff --git a/src/mailman/database/tests/test_migrations.py b/src/mailman/database/tests/test_migrations.py index ce1bc101b..5b686d13b 100644 --- a/src/mailman/database/tests/test_migrations.py +++ b/src/mailman/database/tests/test_migrations.py @@ -18,6 +18,7 @@ """Test database schema migrations with Alembic""" import os +import json import unittest import sqlalchemy as sa import alembic.command @@ -495,3 +496,101 @@ class TestMigrations(unittest.TestCase): self.assertEqual( len(list(config.db.store.execute(mlist_table.select()))), 0) + + def test_7c5b39d1ecc4_workflow_steps_upgrade(self): + old_state_table = sa.sql.table( + 'workflowstate', + sa.sql.column('token', SAUnicode), + sa.sql.column('step', SAUnicode), + sa.sql.column('data', SAUnicode) + ) + new_state_table = sa.sql.table( + 'workflowstate', + sa.sql.column('token', SAUnicode), + sa.sql.column('steps', SAUnicode), + sa.sql.column('data', SAUnicode) + ) + with transaction(): + # Start at the previous revision. + alembic.command.downgrade(alembic_cfg, '4bd95c99b2e7') + config.db.store.execute(old_state_table.insert().values([ + dict(token='12345', + step='some_step', + data='whatever data'), + dict(token='6789', + step=None, + data='other data') + ])) + + # Now upgrade. + alembic.command.upgrade(alembic_cfg, '7c5b39d1ecc4') + + token, steps, data = config.db.store.execute( + new_state_table.select().where( + new_state_table.c.token == '12345' + )).fetchone() + self.assertEqual(token, '12345') + self.assertEqual(steps, json.dumps(['some_step'])) + self.assertEqual(data, 'whatever data') + + token, steps, data = config.db.store.execute( + new_state_table.select().where( + new_state_table.c.token == '6789' + )).fetchone() + self.assertEqual(token, '6789') + self.assertEqual(steps, json.dumps([])) + self.assertEqual(data, 'other data') + + def test_7c5b39d1ecc4_workflow_steps_downgrade(self): + old_state_table = sa.sql.table( + 'workflowstate', + sa.sql.column('token', SAUnicode), + sa.sql.column('step', SAUnicode), + sa.sql.column('data', SAUnicode) + ) + new_state_table = sa.sql.table( + 'workflowstate', + sa.sql.column('token', SAUnicode), + sa.sql.column('steps', SAUnicode), + sa.sql.column('data', SAUnicode) + ) + with transaction(): + # Start at the revision. + alembic.command.downgrade(alembic_cfg, '7c5b39d1ecc4') + config.db.store.execute(new_state_table.insert().values([ + dict(token='12345', + steps=json.dumps(['next_step', 'some_step']), + data='whatever data'), + dict(token='6789', + steps=json.dumps(['only_step']), + data='other data'), + dict(token='abcde', + steps=json.dumps([]), + data='another data') + ])) + # Now downgrade. + alembic.command.downgrade(alembic_cfg, '4bd95c99b2e7') + + token, step, data = config.db.store.execute( + old_state_table.select().where( + old_state_table.c.token == '12345' + )).fetchone() + self.assertEqual(token, '12345') + self.assertEqual(step, 'some_step') + self.assertEqual(data, 'whatever data') + + token, step, data = config.db.store.execute( + old_state_table.select().where( + old_state_table.c.token == '6789' + )).fetchone() + self.assertEqual(token, '6789') + self.assertEqual(step, 'only_step') + self.assertEqual(data, 'other data') + + token, step, data = config.db.store.execute( + old_state_table.select().where( + old_state_table.c.token == 'abcde' + )).fetchone() + self.assertEqual(token, 'abcde') + self.assertEqual(step, None) + self.assertEqual(data, 'another data') diff --git a/src/mailman/interfaces/workflows.py b/src/mailman/interfaces/workflows.py index b110a481e..0fad52cca 100644 --- a/src/mailman/interfaces/workflows.py +++ b/src/mailman/interfaces/workflows.py @@ -27,7 +27,7 @@ class IWorkflowState(Interface): token = Attribute('A unique key identifying the workflow instance.') - step = Attribute("This workflow's next step.") + steps = Attribute("This workflow's next steps.") data = Attribute('Additional data (may be JSON-encoded).') @@ -36,13 +36,13 @@ class IWorkflowState(Interface): class IWorkflowStateManager(Interface): """The workflow states manager.""" - def save(token, step, data=None): + def save(token, steps, data=None): """Save the state of a workflow. :param token: A unique token identifying this workflow instance. :type token: str - :param step: The next step for this workflow. - :type step: str + :param steps: The next steps for this workflow. + :type steps: str :param data: Additional data (workflow-specific). :type data: str """ diff --git a/src/mailman/model/tests/test_workflow.py b/src/mailman/model/tests/test_workflow.py index 41c00efc0..900efbb25 100644 --- a/src/mailman/model/tests/test_workflow.py +++ b/src/mailman/model/tests/test_workflow.py @@ -33,12 +33,12 @@ class TestWorkflow(unittest.TestCase): def test_save_restore_workflow(self): # Save and restore a workflow. token = 'bee' - step = 'cat' + steps = 'cat' data = 'dog' - self._manager.save(token, step, data) + self._manager.save(token, steps, data) state = self._manager.restore(token) self.assertEqual(state.token, token) - self.assertEqual(state.step, step) + self.assertEqual(state.steps, steps) self.assertEqual(state.data, data) def test_save_restore_workflow_without_step(self): @@ -48,17 +48,17 @@ class TestWorkflow(unittest.TestCase): self._manager.save(token, data=data) state = self._manager.restore(token) self.assertEqual(state.token, token) - self.assertIsNone(state.step) + self.assertIsNone(state.steps) self.assertEqual(state.data, data) def test_save_restore_workflow_without_data(self): # Save and restore a workflow that contains no data. token = 'bee' - step = 'cat' - self._manager.save(token, step) + steps = 'cat' + self._manager.save(token, steps) state = self._manager.restore(token) self.assertEqual(state.token, token) - self.assertEqual(state.step, step) + self.assertEqual(state.steps, steps) self.assertIsNone(state.data) def test_save_restore_workflow_without_step_or_data(self): @@ -67,7 +67,7 @@ class TestWorkflow(unittest.TestCase): self._manager.save(token) state = self._manager.restore(token) self.assertEqual(state.token, token) - self.assertIsNone(state.step) + self.assertIsNone(state.steps) self.assertIsNone(state.data) def test_restore_workflow_with_no_matching_token(self): @@ -106,13 +106,13 @@ class TestWorkflow(unittest.TestCase): self._manager.discard('token2') self.assertEqual(self._manager.count, 3) state = self._manager.restore('token1') - self.assertEqual(state.step, 'one') + self.assertEqual(state.steps, 'one') state = self._manager.restore('token2') self.assertIsNone(state) state = self._manager.restore('token3') - self.assertEqual(state.step, 'three') + self.assertEqual(state.steps, 'three') state = self._manager.restore('token4') - self.assertEqual(state.step, 'four') + self.assertEqual(state.steps, 'four') def test_discard_missing_workflow(self): self._manager.discard('bogus-token') diff --git a/src/mailman/model/workflows.py b/src/mailman/model/workflows.py index 587d3375e..be793eb43 100644 --- a/src/mailman/model/workflows.py +++ b/src/mailman/model/workflows.py @@ -34,7 +34,7 @@ class WorkflowState(Model): __tablename__ = 'workflowstate' token = Column(SAUnicode, primary_key=True) - step = Column(SAUnicode) + steps = Column(SAUnicode) data = Column(SAUnicode) @@ -44,9 +44,9 @@ class WorkflowStateManager: """See `IWorkflowStateManager`.""" @dbconnection - def save(self, store, token, step=None, data=None): + def save(self, store, token, steps=None, data=None): """See `IWorkflowStateManager`.""" - state = WorkflowState(token=token, step=step, data=data) + state = WorkflowState(token=token, steps=steps, data=data) store.add(state) @dbconnection diff --git a/src/mailman/workflows/base.py b/src/mailman/workflows/base.py index e374d58b4..8153bf77d 100644 --- a/src/mailman/workflows/base.py +++ b/src/mailman/workflows/base.py @@ -70,7 +70,7 @@ class Workflow: self._next.append(step) def _pop(self): - name = self._next.popleft() + name = self._next.pop() step = getattr(self, '_step_{}'.format(name)) self._count += 1 if self.debug: # pragma: nocover @@ -114,20 +114,12 @@ class Workflow: assert self.token, 'Workflow token must be set' state_manager = getUtility(IWorkflowStateManager) data = {attr: getattr(self, attr) for attr in self.save_attributes} - # Note: only the next step is saved, not the whole stack. This is not - # an issue in practice, since there's never more than a single step in - # the queue anyway. If we want to support more than a single step in - # the queue *and* want to support state saving/restoring, change this - # method and the restore() method. + # Save the workflow stack. if len(self._next) == 0: - step = None - elif len(self._next) == 1: - step = self._next[0] + steps = '[]' else: - raise AssertionError( - "Can't save a workflow state with more than one step " - "in the queue") - state_manager.save(self.token, step, json.dumps(data)) + steps = json.dumps(list(self._next)) + state_manager.save(self.token, steps, json.dumps(data)) def restore(self): """See `IWorkflow`.""" @@ -137,8 +129,8 @@ class Workflow: # The token doesn't exist in the database. raise LookupError(self.token) self._next.clear() - if state.step: - self._next.append(state.step) + if state.steps: + self._next.extend(json.loads(state.steps)) data = json.loads(state.data) for attr in self.save_attributes: try: -- cgit v1.2.3-70-g09d2 From 39fba3777fc7d37414368f40bf3504dadaa1841a Mon Sep 17 00:00:00 2001 From: J08nY Date: Thu, 6 Jul 2017 20:16:36 +0200 Subject: Move workflow tests to mailman.workflows. --- src/mailman/app/tests/test_subscriptions.py | 726 --------------------- src/mailman/app/tests/test_unsubscriptions.py | 520 --------------- src/mailman/app/tests/test_workflow.py | 183 ------ src/mailman/workflows/tests/__init__.py | 0 src/mailman/workflows/tests/test_subscriptions.py | 726 +++++++++++++++++++++ .../workflows/tests/test_unsubscriptions.py | 520 +++++++++++++++ src/mailman/workflows/tests/test_workflow.py | 183 ++++++ 7 files changed, 1429 insertions(+), 1429 deletions(-) delete mode 100644 src/mailman/app/tests/test_subscriptions.py delete mode 100644 src/mailman/app/tests/test_unsubscriptions.py delete mode 100644 src/mailman/app/tests/test_workflow.py create mode 100644 src/mailman/workflows/tests/__init__.py create mode 100644 src/mailman/workflows/tests/test_subscriptions.py create mode 100644 src/mailman/workflows/tests/test_unsubscriptions.py create mode 100644 src/mailman/workflows/tests/test_workflow.py (limited to 'src/mailman/app/tests/test_workflow.py') diff --git a/src/mailman/app/tests/test_subscriptions.py b/src/mailman/app/tests/test_subscriptions.py deleted file mode 100644 index 23487ab1c..000000000 --- a/src/mailman/app/tests/test_subscriptions.py +++ /dev/null @@ -1,726 +0,0 @@ -# Copyright (C) 2011-2017 by the Free Software Foundation, Inc. -# -# This file is part of GNU Mailman. -# -# GNU Mailman is free software: you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free -# Software Foundation, either version 3 of the License, or (at your option) -# any later version. -# -# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -# more details. -# -# You should have received a copy of the GNU General Public License along with -# GNU Mailman. If not, see . - -"""Tests for the subscription service.""" - -import unittest - -from contextlib import suppress -from mailman.app.lifecycle import create_list -from mailman.interfaces.bans import IBanManager -from mailman.interfaces.member import MemberRole, MembershipIsBannedError -from mailman.interfaces.pending import IPendings -from mailman.interfaces.subscriptions import TokenOwner -from mailman.interfaces.usermanager import IUserManager -from mailman.testing.helpers import ( - LogFileMark, get_queue_messages, set_preferred) -from mailman.testing.layers import ConfigLayer -from mailman.utilities.datetime import now -from mailman.workflows.subscription import ( - ConfirmModerationSubscriptionPolicy, ConfirmSubscriptionPolicy, - ModerationSubscriptionPolicy, OpenSubscriptionPolicy) -from unittest.mock import patch -from zope.component import getUtility - - -class TestSubscriptionWorkflow(unittest.TestCase): - layer = ConfigLayer - maxDiff = None - - def setUp(self): - self._mlist = create_list('test@example.com') - self._mlist.admin_immed_notify = False - self._anne = 'anne@example.com' - self._user_manager = getUtility(IUserManager) - self._expected_pendings_count = 0 - - def tearDown(self): - # There usually should be no pending after all is said and done, but - # some tests don't complete the workflow. - self.assertEqual(getUtility(IPendings).count, - self._expected_pendings_count) - - def test_start_state(self): - # The workflow starts with no tokens or member. - workflow = ConfirmSubscriptionPolicy(self._mlist) - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - self.assertIsNone(workflow.member) - - def test_pended_data(self): - # There is a Pendable associated with the held request, and it has - # some data associated with it. - anne = self._user_manager.create_address(self._anne) - workflow = ConfirmSubscriptionPolicy(self._mlist, anne) - with suppress(StopIteration): - workflow.run_thru('send_confirmation') - self.assertIsNotNone(workflow.token) - pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) - self.assertEqual(pendable['list_id'], 'test.example.com') - self.assertEqual(pendable['email'], 'anne@example.com') - self.assertEqual(pendable['display_name'], '') - self.assertEqual(pendable['when'], '2005-08-01T07:49:23') - self.assertEqual(pendable['token_owner'], 'subscriber') - # The token is still in the database. - self._expected_pendings_count = 1 - - def test_user_or_address_required(self): - # The `subscriber` attribute must be a user or address. - workflow = ConfirmSubscriptionPolicy(self._mlist) - self.assertRaises(AssertionError, list, workflow) - - def test_sanity_checks_address(self): - # Ensure that the sanity check phase, when given an IAddress, ends up - # with a linked user. - anne = self._user_manager.create_address(self._anne) - workflow = ConfirmSubscriptionPolicy(self._mlist, anne) - self.assertIsNotNone(workflow.address) - self.assertIsNone(workflow.user) - workflow.run_thru('sanity_checks') - self.assertIsNotNone(workflow.address) - self.assertIsNotNone(workflow.user) - self.assertEqual(list(workflow.user.addresses)[0].email, self._anne) - - def test_sanity_checks_user_with_preferred_address(self): - # Ensure that the sanity check phase, when given an IUser with a - # preferred address, ends up with an address. - anne = self._user_manager.make_user(self._anne) - address = set_preferred(anne) - workflow = ConfirmSubscriptionPolicy(self._mlist, anne) - # The constructor sets workflow.address because the user has a - # preferred address. - self.assertEqual(workflow.address, address) - self.assertEqual(workflow.user, anne) - workflow.run_thru('sanity_checks') - self.assertEqual(workflow.address, address) - self.assertEqual(workflow.user, anne) - - def test_sanity_checks_user_without_preferred_address(self): - # Ensure that the sanity check phase, when given a user without a - # preferred address, but with at least one linked address, gets an - # address. - anne = self._user_manager.make_user(self._anne) - workflow = ConfirmSubscriptionPolicy(self._mlist, anne) - self.assertIsNone(workflow.address) - self.assertEqual(workflow.user, anne) - workflow.run_thru('sanity_checks') - self.assertIsNotNone(workflow.address) - self.assertEqual(workflow.user, anne) - - def test_sanity_checks_user_with_multiple_linked_addresses(self): - # Ensure that the santiy check phase, when given a user without a - # preferred address, but with multiple linked addresses, gets of of - # those addresses (exactly which one is undefined). - anne = self._user_manager.make_user(self._anne) - anne.link(self._user_manager.create_address('anne@example.net')) - anne.link(self._user_manager.create_address('anne@example.org')) - workflow = ConfirmSubscriptionPolicy(self._mlist, anne) - self.assertIsNone(workflow.address) - self.assertEqual(workflow.user, anne) - workflow.run_thru('sanity_checks') - self.assertIn(workflow.address.email, ['anne@example.com', - 'anne@example.net', - 'anne@example.org']) - self.assertEqual(workflow.user, anne) - - def test_sanity_checks_user_without_addresses(self): - # It is an error to try to subscribe a user with no linked addresses. - user = self._user_manager.create_user() - workflow = ConfirmSubscriptionPolicy(self._mlist, user) - self.assertRaises(AssertionError, workflow.run_thru, 'sanity_checks') - - def test_sanity_checks_globally_banned_address(self): - # An exception is raised if the address is globally banned. - anne = self._user_manager.create_address(self._anne) - IBanManager(None).ban(self._anne) - workflow = ConfirmSubscriptionPolicy(self._mlist, anne) - self.assertRaises(MembershipIsBannedError, list, workflow) - - def test_sanity_checks_banned_address(self): - # An exception is raised if the address is banned by the mailing list. - anne = self._user_manager.create_address(self._anne) - IBanManager(self._mlist).ban(self._anne) - workflow = ConfirmSubscriptionPolicy(self._mlist, anne) - self.assertRaises(MembershipIsBannedError, list, workflow) - - def test_verification_checks_with_verified_address(self): - # When the address is already verified, we skip straight to the - # confirmation checks. - anne = self._user_manager.create_address(self._anne) - anne.verified_on = now() - workflow = ConfirmSubscriptionPolicy(self._mlist, anne) - workflow.run_thru('verification_checks') - with patch.object(workflow, '_step_confirmation_checks') as step: - next(workflow) - step.assert_called_once_with() - - def test_verification_checks_with_pre_verified_address(self): - # When the address is not yet verified, but the pre-verified flag is - # passed to the workflow, we skip to the confirmation checks. - anne = self._user_manager.create_address(self._anne) - workflow = ConfirmSubscriptionPolicy(self._mlist, anne, - pre_verified=True) - workflow.run_thru('verification_checks') - with patch.object(workflow, '_step_confirmation_checks') as step: - next(workflow) - step.assert_called_once_with() - # And now the address is verified. - self.assertIsNotNone(anne.verified_on) - - def test_verification_checks_confirmation_needed(self): - # The address is neither verified, nor is the pre-verified flag set. - # A confirmation message must be sent to the user which will also - # verify their address. - anne = self._user_manager.create_address(self._anne) - workflow = ConfirmSubscriptionPolicy(self._mlist, anne) - workflow.run_thru('verification_checks') - with patch.object(workflow, '_step_send_confirmation') as step: - next(workflow) - step.assert_called_once_with() - # The address still hasn't been verified. - self.assertIsNone(anne.verified_on) - - def test_confirmation_checks_open_list(self): - # A subscription to an open list does not need to be confirmed or - # moderated. - self._mlist.subscription_policy = OpenSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True) - workflow.run_thru('verification_checks') - with patch.object(workflow, '_step_do_subscription') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_no_user_confirmation_needed(self): - # A subscription to a list which does not need user confirmation skips - # to the moderation checks. - self._mlist.subscription_policy = ModerationSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True) - workflow.run_thru('verification_checks') - with patch.object(workflow, '_step_moderation_checks') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_confirm_pre_confirmed(self): - # The subscription policy requires user confirmation, but their - # subscription is pre-confirmed. Since moderation is not required, - # the user will be immediately subscribed. - self._mlist.subscription_policy = ConfirmSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True, - pre_confirmed=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_do_subscription') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_confirm_then_moderate_pre_confirmed(self): - # The subscription policy requires user confirmation, but their - # subscription is pre-confirmed. Since moderation is required, that - # check will be performed. - self._mlist.subscription_policy = ( - ConfirmModerationSubscriptionPolicy) - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True, - pre_confirmed=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_moderation_checks') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_confirm_and_moderate_pre_confirmed(self): - # The subscription policy requires user confirmation and moderation, - # but their subscription is pre-confirmed. - self._mlist.subscription_policy = ( - ConfirmModerationSubscriptionPolicy) - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True, - pre_confirmed=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_moderation_checks') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_confirmation_needed(self): - # The subscription policy requires confirmation and the subscription - # is not pre-confirmed. - self._mlist.subscription_policy = ConfirmSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_send_confirmation') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_moderate_confirmation_needed(self): - # The subscription policy requires confirmation and moderation, and the - # subscription is not pre-confirmed. - self._mlist.subscription_policy = ( - ConfirmModerationSubscriptionPolicy) - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_send_confirmation') as step: - next(workflow) - step.assert_called_once_with() - - def test_moderation_checks_pre_approved(self): - # The subscription is pre-approved by the moderator. - self._mlist.subscription_policy = ModerationSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True, - pre_approved=True) - workflow.run_thru('moderation_checks') - with patch.object(workflow, '_step_do_subscription') as step: - next(workflow) - step.assert_called_once_with() - - def test_moderation_checks_approval_required(self): - # The moderator must approve the subscription. - self._mlist.subscription_policy = ModerationSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True) - workflow.run_thru('moderation_checks') - with patch.object(workflow, '_step_get_moderator_approval') as step: - next(workflow) - step.assert_called_once_with() - - def test_do_subscription(self): - # An open subscription policy plus a pre-verified address means the - # user gets subscribed to the mailing list without any further - # confirmations or approvals. - self._mlist.subscription_policy = OpenSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True) - # Consume the entire state machine. - list(workflow) - # Anne is now a member of the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertEqual(member.address, anne) - self.assertEqual(workflow.member, member) - # No further token is needed. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - - def test_do_subscription_pre_approved(self): - # An moderation-requiring subscription policy plus a pre-verified and - # pre-approved address means the user gets subscribed to the mailing - # list without any further confirmations or approvals. - self._mlist.subscription_policy = ModerationSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True, - pre_approved=True) - # Consume the entire state machine. - list(workflow) - # Anne is now a member of the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertEqual(member.address, anne) - self.assertEqual(workflow.member, member) - # No further token is needed. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - - def test_do_subscription_pre_approved_pre_confirmed(self): - # An moderation-requiring subscription policy plus a pre-verified and - # pre-approved address means the user gets subscribed to the mailing - # list without any further confirmations or approvals. - self._mlist.subscription_policy = ( - ConfirmModerationSubscriptionPolicy) - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True, - pre_confirmed=True, - pre_approved=True) - # Consume the entire state machine. - list(workflow) - # Anne is now a member of the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertEqual(member.address, anne) - self.assertEqual(workflow.member, member) - # No further token is needed. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - - def test_do_subscription_cleanups(self): - # Once the user is subscribed, the token, and its associated pending - # database record will be removed from the database. - self._mlist.subscription_policy = OpenSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True) - # Consume the entire state machine. - list(workflow) - # Anne is now a member of the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertEqual(member.address, anne) - self.assertEqual(workflow.member, member) - # The workflow is done, so it has no token. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - - def test_moderator_approves(self): - # The workflow runs until moderator approval is required, at which - # point the workflow is saved. Once the moderator approves, the - # workflow resumes and the user is subscribed. - self._mlist.subscription_policy = ModerationSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True) - # Consume the entire state machine. - list(workflow) - # The user is not currently subscribed to the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - self.assertIsNone(workflow.member) - # The token is owned by the moderator. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.moderator) - # Create a new workflow with the previous workflow's save token, and - # restore its state. This models an approved subscription and should - # result in the user getting subscribed. - approved_workflow = self._mlist.subscription_policy(self._mlist) - approved_workflow.token = workflow.token - approved_workflow.restore() - list(approved_workflow) - # Now the user is subscribed to the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertEqual(member.address, anne) - self.assertEqual(approved_workflow.member, member) - # No further token is needed. - self.assertIsNone(approved_workflow.token) - self.assertEqual(approved_workflow.token_owner, TokenOwner.no_one) - - def test_get_moderator_approval_log_on_hold(self): - # When the subscription is held for moderator approval, a message is - # logged. - mark = LogFileMark('mailman.subscribe') - self._mlist.subscription_policy = ModerationSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True) - # Consume the entire state machine. - list(workflow) - self.assertIn( - 'test@example.com: held subscription request from anne@example.com', - mark.readline() - ) - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_get_moderator_approval_notifies_moderators(self): - # When the subscription is held for moderator approval, and the list - # is so configured, a notification is sent to the list moderators. - self._mlist.admin_immed_notify = True - self._mlist.subscription_policy = ModerationSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - bart = self._user_manager.create_user('bart@example.com', 'Bart User') - address = set_preferred(bart) - self._mlist.subscribe(address, MemberRole.moderator) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True) - # Consume the entire state machine. - list(workflow) - # Find the moderator message. - items = get_queue_messages('virgin', expected_count=1) - for item in items: - if item.msg['to'] == 'test-owner@example.com': - break - else: - raise AssertionError('No moderator email found') - self.assertEqual( - item.msgdata['recipients'], {'test-owner@example.com'}) - message = items[0].msg - self.assertEqual(message['From'], 'test-owner@example.com') - self.assertEqual(message['To'], 'test-owner@example.com') - self.assertEqual( - message['Subject'], - 'New subscription request to Test from anne@example.com') - self.assertEqual(message.get_payload(), """\ -Your authorization is required for a mailing list subscription request -approval: - - For: anne@example.com - List: test@example.com -""") - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_get_moderator_approval_no_notifications(self): - # When the subscription is held for moderator approval, and the list - # is so configured, a notification is sent to the list moderators. - self._mlist.admin_immed_notify = False - self._mlist.subscription_policy = ModerationSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True) - # Consume the entire state machine. - list(workflow) - get_queue_messages('virgin', expected_count=0) - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_send_confirmation(self): - # A confirmation message gets sent when the address is not verified. - anne = self._user_manager.create_address(self._anne) - self.assertIsNone(anne.verified_on) - # Run the workflow to model the confirmation step. - workflow = self._mlist.subscription_policy(self._mlist, anne) - list(workflow) - items = get_queue_messages('virgin', expected_count=1) - message = items[0].msg - token = workflow.token - self.assertEqual(message['Subject'], 'confirm {}'.format(token)) - self.assertEqual( - message['From'], 'test-confirm+{}@example.com'.format(token)) - # The confirmation message is not `Precedence: bulk`. - self.assertIsNone(message['precedence']) - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_send_confirmation_pre_confirmed(self): - # A confirmation message gets sent when the address is not verified - # but the subscription is pre-confirmed. - anne = self._user_manager.create_address(self._anne) - self.assertIsNone(anne.verified_on) - # Run the workflow to model the confirmation step. - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_confirmed=True) - list(workflow) - items = get_queue_messages('virgin', expected_count=1) - message = items[0].msg - token = workflow.token - self.assertEqual( - message['Subject'], 'confirm {}'.format(workflow.token)) - self.assertEqual( - message['From'], 'test-confirm+{}@example.com'.format(token)) - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_send_confirmation_pre_verified(self): - # A confirmation message gets sent even when the address is verified - # when the subscription must be confirmed. - self._mlist.subscription_policy = ConfirmSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - self.assertIsNone(anne.verified_on) - # Run the workflow to model the confirmation step. - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True) - list(workflow) - items = get_queue_messages('virgin', expected_count=1) - message = items[0].msg - token = workflow.token - self.assertEqual( - message['Subject'], 'confirm {}'.format(workflow.token)) - self.assertEqual( - message['From'], 'test-confirm+{}@example.com'.format(token)) - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_do_confirm_verify_address(self): - # The address is not yet verified, nor are we pre-verifying. A - # confirmation message will be sent. When the user confirms their - # subscription request, the address will end up being verified. - anne = self._user_manager.create_address(self._anne) - self.assertIsNone(anne.verified_on) - # Run the workflow to model the confirmation step. - workflow = self._mlist.subscription_policy(self._mlist, anne) - list(workflow) - # The address is still not verified. - self.assertIsNone(anne.verified_on) - confirm_workflow = self._mlist.subscription_policy(self._mlist) - confirm_workflow.token = workflow.token - confirm_workflow.restore() - confirm_workflow.run_thru('do_confirm_verify') - # The address is now verified. - self.assertIsNotNone(anne.verified_on) - - def test_do_confirm_verify_user(self): - # A confirmation step is necessary when a user subscribes with their - # preferred address, and we are not pre-confirming. - anne = self._user_manager.create_user(self._anne) - set_preferred(anne) - # Run the workflow to model the confirmation step. There is no - # subscriber attribute yet. - workflow = self._mlist.subscription_policy(self._mlist, anne) - list(workflow) - self.assertEqual(workflow.subscriber, anne) - # Do a confirmation workflow, which should now set the subscriber. - confirm_workflow = self._mlist.subscription_policy(self._mlist) - confirm_workflow.token = workflow.token - confirm_workflow.restore() - confirm_workflow.run_thru('do_confirm_verify') - # The address is now verified. - self.assertEqual(confirm_workflow.subscriber, anne) - - def test_do_confirmation_subscribes_user(self): - # Subscriptions to the mailing list must be confirmed. Once that's - # done, the user's address (which is not initially verified) gets - # subscribed to the mailing list. - self._mlist.subscription_policy = ConfirmSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - self.assertIsNone(anne.verified_on) - workflow = self._mlist.subscription_policy(self._mlist, anne) - list(workflow) - # Anne is not yet a member. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - self.assertIsNone(workflow.member) - # The token is owned by the subscriber. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.subscriber) - # Confirm. - confirm_workflow = self._mlist.subscription_policy(self._mlist) - confirm_workflow.token = workflow.token - confirm_workflow.restore() - list(confirm_workflow) - self.assertIsNotNone(anne.verified_on) - # Anne is now a member. - member = self._mlist.regular_members.get_member(self._anne) - self.assertEqual(member.address, anne) - self.assertEqual(confirm_workflow.member, member) - # No further token is needed. - self.assertIsNone(confirm_workflow.token) - self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one) - - def test_prevent_confirmation_replay_attacks(self): - # Ensure that if the workflow requires two confirmations, e.g. first - # the user confirming their subscription, and then the moderator - # approving it, that different tokens are used in these two cases. - self._mlist.subscription_policy = ( - ConfirmModerationSubscriptionPolicy) - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True) - # Run the state machine up to the first confirmation, and cache the - # confirmation token. - list(workflow) - token = workflow.token - # Anne is not yet a member of the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - self.assertIsNone(workflow.member) - # The token is owned by the subscriber. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.subscriber) - # The old token will not work for moderator approval. - moderator_workflow = self._mlist.subscription_policy(self._mlist) - moderator_workflow.token = token - moderator_workflow.restore() - list(moderator_workflow) - # The token is owned by the moderator. - self.assertIsNotNone(moderator_workflow.token) - self.assertEqual(moderator_workflow.token_owner, TokenOwner.moderator) - # While we wait for the moderator to approve the subscription, note - # that there's a new token for the next steps. - self.assertNotEqual(token, moderator_workflow.token) - # The old token won't work. - final_workflow = self._mlist.subscription_policy(self._mlist) - final_workflow.token = token - self.assertRaises(LookupError, final_workflow.restore) - # Running this workflow will fail. - self.assertRaises(AssertionError, list, final_workflow) - # Anne is still not subscribed. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - self.assertIsNone(final_workflow.member) - # However, if we use the new token, her subscription request will be - # approved by the moderator. - final_workflow.token = moderator_workflow.token - final_workflow.restore() - list(final_workflow) - # And now Anne is a member. - member = self._mlist.regular_members.get_member(self._anne) - self.assertEqual(member.address.email, self._anne) - self.assertEqual(final_workflow.member, member) - # No further token is needed. - self.assertIsNone(final_workflow.token) - self.assertEqual(final_workflow.token_owner, TokenOwner.no_one) - - def test_confirmation_needed_and_pre_confirmed(self): - # The subscription policy is 'confirm' but the subscription is - # pre-confirmed so the moderation checks can be skipped. - self._mlist.subscription_policy = ConfirmSubscriptionPolicy - anne = self._user_manager.create_address(self._anne) - workflow = self._mlist.subscription_policy( - self._mlist, anne, - pre_verified=True, pre_confirmed=True) - list(workflow) - # Anne was subscribed. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - self.assertEqual(workflow.member.address, anne) - - def test_restore_user_absorbed(self): - # The subscribing user is absorbed (and thus deleted) before the - # moderator approves the subscription. - self._mlist.subscription_policy = ModerationSubscriptionPolicy - anne = self._user_manager.create_user(self._anne) - bill = self._user_manager.create_user('bill@example.com') - set_preferred(bill) - # anne subscribes. - workflow = self._mlist.subscription_policy(self._mlist, anne, - pre_verified=True) - list(workflow) - # bill absorbs anne. - bill.absorb(anne) - # anne's subscription request is approved. - approved_workflow = self._mlist.subscription_policy(self._mlist) - approved_workflow.token = workflow.token - approved_workflow.restore() - self.assertEqual(approved_workflow.user, bill) - # Run the workflow through. - list(approved_workflow) - - def test_restore_address_absorbed(self): - # The subscribing user is absorbed (and thus deleted) before the - # moderator approves the subscription. - self._mlist.subscription_policy = ModerationSubscriptionPolicy - anne = self._user_manager.create_user(self._anne) - anne_address = anne.addresses[0] - bill = self._user_manager.create_user('bill@example.com') - # anne subscribes. - workflow = self._mlist.subscription_policy( - self._mlist, anne_address, pre_verified=True) - list(workflow) - # bill absorbs anne. - bill.absorb(anne) - self.assertIn(anne_address, bill.addresses) - # anne's subscription request is approved. - approved_workflow = self._mlist.subscription_policy(self._mlist) - approved_workflow.token = workflow.token - approved_workflow.restore() - self.assertEqual(approved_workflow.user, bill) - # Run the workflow through. - list(approved_workflow) diff --git a/src/mailman/app/tests/test_unsubscriptions.py b/src/mailman/app/tests/test_unsubscriptions.py deleted file mode 100644 index 2e210e90b..000000000 --- a/src/mailman/app/tests/test_unsubscriptions.py +++ /dev/null @@ -1,520 +0,0 @@ -# Copyright (C) 2016-2017 by the Free Software Foundation, Inc. -# -# This file is part of GNU Mailman. -# -# GNU Mailman is free software: you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free -# Software Foundation, either version 3 of the License, or (at your option) -# any later version. -# -# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -# more details. -# -# You should have received a copy of the GNU General Public License along with -# GNU Mailman. If not, see . - -"""Test for unsubscription service.""" - -import unittest - -from contextlib import suppress -from mailman.app.lifecycle import create_list -from mailman.interfaces.pending import IPendings -from mailman.interfaces.subscriptions import TokenOwner -from mailman.interfaces.usermanager import IUserManager -from mailman.testing.helpers import LogFileMark, get_queue_messages -from mailman.testing.layers import ConfigLayer -from mailman.utilities.datetime import now -from mailman.workflows.unsubscription import ( - ConfirmModerationUnsubscriptionPolicy, ConfirmUnsubscriptionPolicy, - ModerationUnsubscriptionPolicy, OpenUnsubscriptionPolicy) -from unittest.mock import patch -from zope.component import getUtility - - -class TestUnSubscriptionWorkflow(unittest.TestCase): - layer = ConfigLayer - maxDiff = None - - def setUp(self): - self._mlist = create_list('test@example.com') - self._mlist.admin_immed_notify = False - self._mlist.unsubscription_policy = OpenUnsubscriptionPolicy - self._mlist.send_welcome_message = False - self._anne = 'anne@example.com' - self._user_manager = getUtility(IUserManager) - self.anne = self._user_manager.create_user(self._anne) - self.anne.addresses[0].verified_on = now() - self.anne.preferred_address = self.anne.addresses[0] - self._mlist.subscribe(self.anne) - self._expected_pendings_count = 0 - - def tearDown(self): - # There usually should be no pending after all is said and done, but - # some tests don't complete the workflow. - self.assertEqual(getUtility(IPendings).count, - self._expected_pendings_count) - - def test_start_state(self): - # Test the workflow starts with no tokens or members. - workflow = self._mlist.unsubscription_policy(self._mlist) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - self.assertIsNone(workflow.token) - self.assertIsNone(workflow.member) - - def test_pended_data(self): - # Test there is a Pendable object associated with a held - # unsubscription request and it has some valid data associated with - # it. - self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) - with suppress(StopIteration): - workflow.run_thru('send_confirmation') - self.assertIsNotNone(workflow.token) - pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) - self.assertEqual(pendable['list_id'], 'test.example.com') - self.assertEqual(pendable['email'], 'anne@example.com') - self.assertEqual(pendable['display_name'], '') - self.assertEqual(pendable['when'], '2005-08-01T07:49:23') - self.assertEqual(pendable['token_owner'], 'subscriber') - # The token is still in the database. - self._expected_pendings_count = 1 - - def test_user_or_address_required(self): - # The `subscriber` attribute must be a user or address that is provided - # to the workflow. - workflow = OpenUnsubscriptionPolicy(self._mlist) - self.assertRaises(AssertionError, list, workflow) - - def test_user_is_subscribed_to_unsubscribe(self): - # A user must be subscribed to a list when trying to unsubscribe. - addr = self._user_manager.create_address('aperson@example.org') - addr.verfied_on = now() - workflow = self._mlist.unsubscription_policy(self._mlist, addr) - self.assertRaises(AssertionError, - workflow.run_thru, 'subscription_checks') - - def test_confirmation_checks_open_list(self): - # An unsubscription from an open list does not need to be confirmed or - # moderated. - self._mlist.unsubscription_policy = OpenUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) - workflow.run_thru('subscription_checks') - with patch.object(workflow, '_step_do_unsubscription') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_no_user_confirmation_needed(self): - # An unsubscription from a list which does not need user confirmation - # skips to the moderation checks. - self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) - workflow.run_thru('subscription_checks') - with patch.object(workflow, '_step_moderation_checks') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_confirm_pre_confirmed(self): - # The unsubscription policy requires user-confirmation, but their - # unsubscription is pre-confirmed. Since moderation is not reuqired, - # the user will be immediately unsubscribed. - self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy( - self._mlist, self.anne, pre_confirmed=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_do_unsubscription') as step: - next(workflow) - step.assert_called_once_with() - - def test_confirmation_checks_confirm_then_moderate_pre_confirmed(self): - # The unsubscription policy requires user confirmation, but their - # unsubscription is pre-confirmed. Since moderation is required, that - # check will be performed. - self._mlist.unsubscription_policy = ( - ConfirmModerationUnsubscriptionPolicy) - workflow = self._mlist.unsubscription_policy( - self._mlist, self.anne, pre_confirmed=True) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_moderation_checks') as step: - next(workflow) - step.assert_called_once_with() - - def test_send_confirmation_checks_confirm_list(self): - # The unsubscription policy requires user confirmation and the - # unsubscription is not pre-confirmed. - self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) - workflow.run_thru('confirmation_checks') - with patch.object(workflow, '_step_send_confirmation') as step: - next(workflow) - step.assert_called_once_with() - - def test_moderation_checks_moderated_list(self): - # The unsubscription policy requires moderation. - self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) - workflow.run_thru('subscription_checks') - with patch.object(workflow, '_step_moderation_checks') as step: - next(workflow) - step.assert_called_once_with() - - def test_moderation_checks_approval_required(self): - # The moderator must approve the subscription request. - self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) - workflow.run_thru('moderation_checks') - with patch.object(workflow, '_step_get_moderator_approval') as step: - next(workflow) - step.assert_called_once_with() - - def test_do_unsusbcription(self): - # An open unsubscription policy means the user gets unsubscribed to - # the mailing list without any further confirmations or approvals. - self._mlist.unsubscription_policy = OpenUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) - list(workflow) - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - - def test_do_unsubscription_pre_approved(self): - # A moderation-requiring subscription policy plus a pre-approved - # address means the user gets unsubscribed from the mailing list - # without any further confirmation or approvals. - self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy(self._mlist, self.anne, - pre_approved=True) - list(workflow) - # Anne is now unsubscribed form the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - # No further token is needed. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - - def test_do_unsubscription_pre_approved_pre_confirmed(self): - # A moderation-requiring unsubscription policy plus a pre-appvoed - # address means the user gets unsubscribed to the mailing list without - # any further confirmations or approvals. - self._mlist.unsubscription_policy = ( - ConfirmModerationUnsubscriptionPolicy) - workflow = self._mlist.unsubscription_policy(self._mlist, self.anne, - pre_approved=True, - pre_confirmed=True) - list(workflow) - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - # No further token is needed. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - - def test_do_unsubscription_cleanups(self): - # Once the user is unsubscribed, the token and its associated pending - # database record will be removed from the database. - self._mlist.unsubscription_policy = OpenUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) - # Run the workflow. - list(workflow) - # Anne is now unsubscribed from the list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - # Workflow is done, so it has no token. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - - def test_moderator_approves(self): - # The workflow runs until moderator approval is required, at which - # point the workflow is saved. Once the moderator approves, the - # workflow resumes and the user is unsubscribed. - self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy( - self._mlist, self.anne) - # Run the entire workflow. - list(workflow) - # The user is currently subscribed to the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNotNone(member) - self.assertIsNotNone(workflow.member) - # The token is owned by the moderator. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.moderator) - # Create a new workflow with the previous workflow's save token, and - # restore its state. This models an approved un-sunscription request - # and should result in the user getting subscribed. - approved_workflow = self._mlist.unsubscription_policy(self._mlist) - approved_workflow.token = workflow.token - approved_workflow.restore() - list(approved_workflow) - # Now the user is unsubscribed from the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - self.assertEqual(approved_workflow.member, member) - # No further token is needed. - self.assertIsNone(approved_workflow.token) - self.assertEqual(approved_workflow.token_owner, TokenOwner.no_one) - - def test_get_moderator_approval_log_on_hold(self): - # When the unsubscription is held for moderator approval, a message is - # logged. - mark = LogFileMark('mailman.subscribe') - self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy( - self._mlist, self.anne) - # Run the entire workflow. - list(workflow) - self.assertIn( - 'test@example.com: held unsubscription request from anne@example.com', - mark.readline() - ) - # The state machine stopped at the moderator approval step so there - # will be one token still in the database. - self._expected_pendings_count = 1 - - def test_get_moderator_approval_notifies_moderators(self): - # When the unsubscription is held for moderator approval, and the list - # is so configured, a notification is sent to the list moderators. - self._mlist.admin_immed_notify = True - self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy( - self._mlist, self.anne) - # Consume the entire state machine. - list(workflow) - items = get_queue_messages('virgin', expected_count=1) - message = items[0].msg - self.assertEqual(message['From'], 'test-owner@example.com') - self.assertEqual(message['To'], 'test-owner@example.com') - self.assertEqual( - message['Subject'], - 'New unsubscription request to Test from anne@example.com') - self.assertEqual(message.get_payload(), """\ -Your authorization is required for a mailing list unsubscription -request approval: - - For: anne@example.com - List: test@example.com -""") - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_get_moderator_approval_no_notifications(self): - # When the unsubscription request is held for moderator approval, and - # the list is so configured, a notification is sent to the list - # moderators. - self._mlist.admin_immed_notify = False - self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy( - self._mlist, self.anne) - # Consume the entire state machine. - list(workflow) - get_queue_messages('virgin', expected_count=0) - # The state machine stopped at the moderator approval so there will be - # one token still in the database. - self._expected_pendings_count = 1 - - def test_send_confirmation(self): - # A confirmation message gets sent when the unsubscription must be - # confirmed. - self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy - # Run the workflow to model the confirmation step. - workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) - list(workflow) - items = get_queue_messages('virgin', expected_count=1) - message = items[0].msg - token = workflow.token - self.assertEqual( - message['Subject'], 'confirm {}'.format(workflow.token)) - self.assertEqual( - message['From'], 'test-confirm+{}@example.com'.format(token)) - # The state machine stopped at the member confirmation step so there - # will be one token still in the database. - self._expected_pendings_count = 1 - - def test_do_confirmation_unsubscribes_user(self): - # Unsubscriptions to the mailing list must be confirmed. Once that's - # done, the user's address is unsubscribed. - self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) - list(workflow) - # Anne is a member. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNotNone(member) - self.assertEqual(member, workflow.member) - # The token is owned by the subscriber. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.subscriber) - # Confirm. - confirm_workflow = self._mlist.unsubscription_policy(self._mlist) - confirm_workflow.token = workflow.token - confirm_workflow.restore() - list(confirm_workflow) - # Anne is now unsubscribed. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - # No further token is needed. - self.assertIsNone(confirm_workflow.token) - self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one) - - def test_do_confirmation_unsubscribes_address(self): - # Unsubscriptions to the mailing list must be confirmed. Once that's - # done, the address is unsubscribed. - address = self.anne.register('anne.person@example.com') - self._mlist.subscribe(address) - self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy(self._mlist, address) - list(workflow) - # Bart is a member. - member = self._mlist.regular_members.get_member( - 'anne.person@example.com') - self.assertIsNotNone(member) - self.assertEqual(member, workflow.member) - # The token is owned by the subscriber. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.subscriber) - # Confirm. - confirm_workflow = self._mlist.unsubscription_policy(self._mlist) - confirm_workflow.token = workflow.token - confirm_workflow.restore() - list(confirm_workflow) - # Bart is now unsubscribed. - member = self._mlist.regular_members.get_member( - 'anne.person@example.com') - self.assertIsNone(member) - # No further token is needed. - self.assertIsNone(confirm_workflow.token) - self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one) - - def test_do_confirmation_nonmember(self): - # Attempt to confirm the unsubscription of a member who has already - # been unsubscribed. - self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) - list(workflow) - # Anne is a member. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNotNone(member) - self.assertEqual(member, workflow.member) - # The token is owned by the subscriber. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.subscriber) - # Unsubscribe Anne out of band. - member.unsubscribe() - # Confirm. - confirm_workflow = self._mlist.unsubscription_policy(self._mlist) - confirm_workflow.token = workflow.token - confirm_workflow.restore() - list(confirm_workflow) - # No further token is needed. - self.assertIsNone(confirm_workflow.token) - self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one) - - def test_do_confirmation_nonmember_final_step(self): - # Attempt to confirm the unsubscription of a member who has already - # been unsubscribed. - self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) - list(workflow) - # Anne is a member. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNotNone(member) - self.assertEqual(member, workflow.member) - # The token is owned by the subscriber. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.subscriber) - # Confirm. - confirm_workflow = self._mlist.unsubscription_policy(self._mlist) - confirm_workflow.token = workflow.token - confirm_workflow.restore() - confirm_workflow.run_until('do_unsubscription') - self.assertEqual(member, confirm_workflow.member) - # Unsubscribe Anne out of band. - member.unsubscribe() - list(confirm_workflow) - self.assertIsNone(confirm_workflow.member) - # No further token is needed. - self.assertIsNone(confirm_workflow.token) - self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one) - - def test_prevent_confirmation_replay_attacks(self): - # Ensure that if the workflow requires two confirmations, e.g. first - # the user confirming their subscription, and then the moderator - # approving it, that different tokens are used in these two cases. - self._mlist.unsubscription_policy = ( - ConfirmModerationUnsubscriptionPolicy) - workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) - # Run the state machine up to the first confirmation, and cache the - # confirmation token. - list(workflow) - token = workflow.token - # Anne is still a member of the mailing list. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNotNone(member) - self.assertIsNotNone(workflow.member) - # The token is owned by the subscriber. - self.assertIsNotNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.subscriber) - # The old token will not work for moderator approval. - moderator_workflow = self._mlist.unsubscription_policy(self._mlist) - moderator_workflow.token = token - moderator_workflow.restore() - list(moderator_workflow) - # The token is owned by the moderator. - self.assertIsNotNone(moderator_workflow.token) - self.assertEqual(moderator_workflow.token_owner, TokenOwner.moderator) - # While we wait for the moderator to approve the subscription, note - # that there's a new token for the next steps. - self.assertNotEqual(token, moderator_workflow.token) - # The old token won't work. - final_workflow = self._mlist.unsubscription_policy(self._mlist) - final_workflow.token = token - self.assertRaises(LookupError, final_workflow.restore) - # Running this workflow will fail. - self.assertRaises(AssertionError, list, final_workflow) - # Anne is still not unsubscribed. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNotNone(member) - self.assertIsNone(final_workflow.member) - # However, if we use the new token, her unsubscription request will be - # approved by the moderator. - final_workflow.token = moderator_workflow.token - final_workflow.restore() - list(final_workflow) - # And now Anne is unsubscribed. - member = self._mlist.regular_members.get_member(self._anne) - self.assertIsNone(member) - # No further token is needed. - self.assertIsNone(final_workflow.token) - self.assertEqual(final_workflow.token_owner, TokenOwner.no_one) - - def test_confirmation_needed_and_pre_confirmed(self): - # The subscription policy is 'confirm' but the subscription is - # pre-confirmed so the moderation checks can be skipped. - self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy( - self._mlist, self.anne, pre_confirmed=True) - list(workflow) - # Anne was unsubscribed. - self.assertIsNone(workflow.token) - self.assertEqual(workflow.token_owner, TokenOwner.no_one) - self.assertIsNone(workflow.member) - - def test_confirmation_needed_moderator_address(self): - address = self.anne.register('anne.person@example.com') - self._mlist.subscribe(address) - self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy - workflow = self._mlist.unsubscription_policy(self._mlist, address) - # Get moderator approval. - list(workflow) - approved_workflow = self._mlist.unsubscription_policy(self._mlist) - approved_workflow.token = workflow.token - approved_workflow.restore() - list(approved_workflow) - self.assertEqual(approved_workflow.subscriber, address) - # Anne was unsubscribed. - self.assertIsNone(approved_workflow.token) - self.assertEqual(approved_workflow.token_owner, TokenOwner.no_one) - self.assertIsNone(approved_workflow.member) - member = self._mlist.regular_members.get_member( - 'anne.person@example.com') - self.assertIsNone(member) diff --git a/src/mailman/app/tests/test_workflow.py b/src/mailman/app/tests/test_workflow.py deleted file mode 100644 index 3e7856b29..000000000 --- a/src/mailman/app/tests/test_workflow.py +++ /dev/null @@ -1,183 +0,0 @@ -# Copyright (C) 2015-2017 by the Free Software Foundation, Inc. -# -# This file is part of GNU Mailman. -# -# GNU Mailman is free software: you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free -# Software Foundation, either version 3 of the License, or (at your option) -# any later version. -# -# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -# more details. -# -# You should have received a copy of the GNU General Public License along with -# GNU Mailman. If not, see . - -"""App-level workflow tests.""" - -import json -import unittest - -from mailman.interfaces.workflows import IWorkflowStateManager -from mailman.testing.layers import ConfigLayer -from mailman.workflows.base import Workflow -from zope.component import getUtility - - -class MyWorkflow(Workflow): - initial_state = 'first' - save_attributes = ('ant', 'bee', 'cat') - - def __init__(self): - super().__init__() - self.token = 'test-workflow' - self.ant = 1 - self.bee = 2 - self.cat = 3 - self.dog = 4 - - def _step_first(self): - self.push('second') - return 'one' - - def _step_second(self): - self.push('third') - return 'two' - - def _step_third(self): - return 'three' - - -class DependentWorkflow(MyWorkflow): - save_attributes = ('ant', 'bee', 'cat', 'elf') - - def __init__(self): - super().__init__() - self._elf = 5 - - @property - def elf(self): - return self._elf - - @elf.setter - def elf(self, value): - # This attribute depends on other attributes. - assert self.ant is not None - assert self.bee is not None - assert self.cat is not None - self._elf = value - - -class TestWorkflow(unittest.TestCase): - layer = ConfigLayer - - def setUp(self): - self._workflow = iter(MyWorkflow()) - - def test_basic_workflow(self): - # The work flows from one state to the next. - results = list(self._workflow) - self.assertEqual(results, ['one', 'two', 'three']) - - def test_partial_workflow(self): - # You don't have to flow through every step. - results = next(self._workflow) - self.assertEqual(results, 'one') - - def test_exhaust_workflow(self): - # Manually flow through a few steps, then consume the whole thing. - results = [next(self._workflow)] - results.extend(self._workflow) - self.assertEqual(results, ['one', 'two', 'three']) - - def test_save_and_restore_workflow(self): - # Without running any steps, save and restore the workflow. Then - # consume the restored workflow. - self._workflow.save() - new_workflow = MyWorkflow() - new_workflow.restore() - results = list(new_workflow) - self.assertEqual(results, ['one', 'two', 'three']) - - def test_save_and_restore_partial_workflow(self): - # After running a few steps, save and restore the workflow. Then - # consume the restored workflow. - next(self._workflow) - self._workflow.save() - new_workflow = MyWorkflow() - new_workflow.restore() - results = list(new_workflow) - self.assertEqual(results, ['two', 'three']) - - def test_save_and_restore_exhausted_workflow(self): - # After consuming the entire workflow, save and restore it. - list(self._workflow) - self._workflow.save() - new_workflow = MyWorkflow() - new_workflow.restore() - results = list(new_workflow) - self.assertEqual(len(results), 0) - - def test_save_and_restore_attributes(self): - # Saved attributes are restored. - self._workflow.ant = 9 - self._workflow.bee = 8 - self._workflow.cat = 7 - # Don't save .dog. - self._workflow.save() - new_workflow = MyWorkflow() - new_workflow.restore() - self.assertEqual(new_workflow.ant, 9) - self.assertEqual(new_workflow.bee, 8) - self.assertEqual(new_workflow.cat, 7) - self.assertEqual(new_workflow.dog, 4) - - def test_save_and_restore_dependant_attributes(self): - # Attributes must be restored in the order they are declared in - # save_attributes. - workflow = iter(DependentWorkflow()) - workflow.elf = 6 - workflow.save() - new_workflow = DependentWorkflow() - # The elf attribute must be restored last, set triggering values for - # attributes it depends on. - new_workflow.ant = new_workflow.bee = new_workflow.cat = None - new_workflow.restore() - self.assertEqual(new_workflow.elf, 6) - - def test_save_and_restore_obsolete_attributes(self): - # Obsolete saved attributes are ignored. - state_manager = getUtility(IWorkflowStateManager) - # Save the state of an old version of the workflow that would not have - # the cat attribute. - state_manager.save( - self._workflow.token, '["first"]', - json.dumps({'ant': 1, 'bee': 2})) - # Restore in the current version that needs the cat attribute. - new_workflow = MyWorkflow() - try: - new_workflow.restore() - except KeyError: - self.fail('Restore does not handle obsolete attributes') - # Restoring must not raise an exception, the default value is kept. - self.assertEqual(new_workflow.cat, 3) - - def test_run_thru(self): - # Run all steps through the given one. - results = self._workflow.run_thru('second') - self.assertEqual(results, ['one', 'two']) - - def test_run_thru_completes(self): - results = self._workflow.run_thru('all of them') - self.assertEqual(results, ['one', 'two', 'three']) - - def test_run_until(self): - # Run until (but not including) the given step. - results = self._workflow.run_until('second') - self.assertEqual(results, ['one']) - - def test_run_until_completes(self): - results = self._workflow.run_until('all of them') - self.assertEqual(results, ['one', 'two', 'three']) diff --git a/src/mailman/workflows/tests/__init__.py b/src/mailman/workflows/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/mailman/workflows/tests/test_subscriptions.py b/src/mailman/workflows/tests/test_subscriptions.py new file mode 100644 index 000000000..23487ab1c --- /dev/null +++ b/src/mailman/workflows/tests/test_subscriptions.py @@ -0,0 +1,726 @@ +# Copyright (C) 2011-2017 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see . + +"""Tests for the subscription service.""" + +import unittest + +from contextlib import suppress +from mailman.app.lifecycle import create_list +from mailman.interfaces.bans import IBanManager +from mailman.interfaces.member import MemberRole, MembershipIsBannedError +from mailman.interfaces.pending import IPendings +from mailman.interfaces.subscriptions import TokenOwner +from mailman.interfaces.usermanager import IUserManager +from mailman.testing.helpers import ( + LogFileMark, get_queue_messages, set_preferred) +from mailman.testing.layers import ConfigLayer +from mailman.utilities.datetime import now +from mailman.workflows.subscription import ( + ConfirmModerationSubscriptionPolicy, ConfirmSubscriptionPolicy, + ModerationSubscriptionPolicy, OpenSubscriptionPolicy) +from unittest.mock import patch +from zope.component import getUtility + + +class TestSubscriptionWorkflow(unittest.TestCase): + layer = ConfigLayer + maxDiff = None + + def setUp(self): + self._mlist = create_list('test@example.com') + self._mlist.admin_immed_notify = False + self._anne = 'anne@example.com' + self._user_manager = getUtility(IUserManager) + self._expected_pendings_count = 0 + + def tearDown(self): + # There usually should be no pending after all is said and done, but + # some tests don't complete the workflow. + self.assertEqual(getUtility(IPendings).count, + self._expected_pendings_count) + + def test_start_state(self): + # The workflow starts with no tokens or member. + workflow = ConfirmSubscriptionPolicy(self._mlist) + self.assertIsNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.no_one) + self.assertIsNone(workflow.member) + + def test_pended_data(self): + # There is a Pendable associated with the held request, and it has + # some data associated with it. + anne = self._user_manager.create_address(self._anne) + workflow = ConfirmSubscriptionPolicy(self._mlist, anne) + with suppress(StopIteration): + workflow.run_thru('send_confirmation') + self.assertIsNotNone(workflow.token) + pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) + self.assertEqual(pendable['list_id'], 'test.example.com') + self.assertEqual(pendable['email'], 'anne@example.com') + self.assertEqual(pendable['display_name'], '') + self.assertEqual(pendable['when'], '2005-08-01T07:49:23') + self.assertEqual(pendable['token_owner'], 'subscriber') + # The token is still in the database. + self._expected_pendings_count = 1 + + def test_user_or_address_required(self): + # The `subscriber` attribute must be a user or address. + workflow = ConfirmSubscriptionPolicy(self._mlist) + self.assertRaises(AssertionError, list, workflow) + + def test_sanity_checks_address(self): + # Ensure that the sanity check phase, when given an IAddress, ends up + # with a linked user. + anne = self._user_manager.create_address(self._anne) + workflow = ConfirmSubscriptionPolicy(self._mlist, anne) + self.assertIsNotNone(workflow.address) + self.assertIsNone(workflow.user) + workflow.run_thru('sanity_checks') + self.assertIsNotNone(workflow.address) + self.assertIsNotNone(workflow.user) + self.assertEqual(list(workflow.user.addresses)[0].email, self._anne) + + def test_sanity_checks_user_with_preferred_address(self): + # Ensure that the sanity check phase, when given an IUser with a + # preferred address, ends up with an address. + anne = self._user_manager.make_user(self._anne) + address = set_preferred(anne) + workflow = ConfirmSubscriptionPolicy(self._mlist, anne) + # The constructor sets workflow.address because the user has a + # preferred address. + self.assertEqual(workflow.address, address) + self.assertEqual(workflow.user, anne) + workflow.run_thru('sanity_checks') + self.assertEqual(workflow.address, address) + self.assertEqual(workflow.user, anne) + + def test_sanity_checks_user_without_preferred_address(self): + # Ensure that the sanity check phase, when given a user without a + # preferred address, but with at least one linked address, gets an + # address. + anne = self._user_manager.make_user(self._anne) + workflow = ConfirmSubscriptionPolicy(self._mlist, anne) + self.assertIsNone(workflow.address) + self.assertEqual(workflow.user, anne) + workflow.run_thru('sanity_checks') + self.assertIsNotNone(workflow.address) + self.assertEqual(workflow.user, anne) + + def test_sanity_checks_user_with_multiple_linked_addresses(self): + # Ensure that the santiy check phase, when given a user without a + # preferred address, but with multiple linked addresses, gets of of + # those addresses (exactly which one is undefined). + anne = self._user_manager.make_user(self._anne) + anne.link(self._user_manager.create_address('anne@example.net')) + anne.link(self._user_manager.create_address('anne@example.org')) + workflow = ConfirmSubscriptionPolicy(self._mlist, anne) + self.assertIsNone(workflow.address) + self.assertEqual(workflow.user, anne) + workflow.run_thru('sanity_checks') + self.assertIn(workflow.address.email, ['anne@example.com', + 'anne@example.net', + 'anne@example.org']) + self.assertEqual(workflow.user, anne) + + def test_sanity_checks_user_without_addresses(self): + # It is an error to try to subscribe a user with no linked addresses. + user = self._user_manager.create_user() + workflow = ConfirmSubscriptionPolicy(self._mlist, user) + self.assertRaises(AssertionError, workflow.run_thru, 'sanity_checks') + + def test_sanity_checks_globally_banned_address(self): + # An exception is raised if the address is globally banned. + anne = self._user_manager.create_address(self._anne) + IBanManager(None).ban(self._anne) + workflow = ConfirmSubscriptionPolicy(self._mlist, anne) + self.assertRaises(MembershipIsBannedError, list, workflow) + + def test_sanity_checks_banned_address(self): + # An exception is raised if the address is banned by the mailing list. + anne = self._user_manager.create_address(self._anne) + IBanManager(self._mlist).ban(self._anne) + workflow = ConfirmSubscriptionPolicy(self._mlist, anne) + self.assertRaises(MembershipIsBannedError, list, workflow) + + def test_verification_checks_with_verified_address(self): + # When the address is already verified, we skip straight to the + # confirmation checks. + anne = self._user_manager.create_address(self._anne) + anne.verified_on = now() + workflow = ConfirmSubscriptionPolicy(self._mlist, anne) + workflow.run_thru('verification_checks') + with patch.object(workflow, '_step_confirmation_checks') as step: + next(workflow) + step.assert_called_once_with() + + def test_verification_checks_with_pre_verified_address(self): + # When the address is not yet verified, but the pre-verified flag is + # passed to the workflow, we skip to the confirmation checks. + anne = self._user_manager.create_address(self._anne) + workflow = ConfirmSubscriptionPolicy(self._mlist, anne, + pre_verified=True) + workflow.run_thru('verification_checks') + with patch.object(workflow, '_step_confirmation_checks') as step: + next(workflow) + step.assert_called_once_with() + # And now the address is verified. + self.assertIsNotNone(anne.verified_on) + + def test_verification_checks_confirmation_needed(self): + # The address is neither verified, nor is the pre-verified flag set. + # A confirmation message must be sent to the user which will also + # verify their address. + anne = self._user_manager.create_address(self._anne) + workflow = ConfirmSubscriptionPolicy(self._mlist, anne) + workflow.run_thru('verification_checks') + with patch.object(workflow, '_step_send_confirmation') as step: + next(workflow) + step.assert_called_once_with() + # The address still hasn't been verified. + self.assertIsNone(anne.verified_on) + + def test_confirmation_checks_open_list(self): + # A subscription to an open list does not need to be confirmed or + # moderated. + self._mlist.subscription_policy = OpenSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True) + workflow.run_thru('verification_checks') + with patch.object(workflow, '_step_do_subscription') as step: + next(workflow) + step.assert_called_once_with() + + def test_confirmation_checks_no_user_confirmation_needed(self): + # A subscription to a list which does not need user confirmation skips + # to the moderation checks. + self._mlist.subscription_policy = ModerationSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True) + workflow.run_thru('verification_checks') + with patch.object(workflow, '_step_moderation_checks') as step: + next(workflow) + step.assert_called_once_with() + + def test_confirmation_checks_confirm_pre_confirmed(self): + # The subscription policy requires user confirmation, but their + # subscription is pre-confirmed. Since moderation is not required, + # the user will be immediately subscribed. + self._mlist.subscription_policy = ConfirmSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True, + pre_confirmed=True) + workflow.run_thru('confirmation_checks') + with patch.object(workflow, '_step_do_subscription') as step: + next(workflow) + step.assert_called_once_with() + + def test_confirmation_checks_confirm_then_moderate_pre_confirmed(self): + # The subscription policy requires user confirmation, but their + # subscription is pre-confirmed. Since moderation is required, that + # check will be performed. + self._mlist.subscription_policy = ( + ConfirmModerationSubscriptionPolicy) + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True, + pre_confirmed=True) + workflow.run_thru('confirmation_checks') + with patch.object(workflow, '_step_moderation_checks') as step: + next(workflow) + step.assert_called_once_with() + + def test_confirmation_checks_confirm_and_moderate_pre_confirmed(self): + # The subscription policy requires user confirmation and moderation, + # but their subscription is pre-confirmed. + self._mlist.subscription_policy = ( + ConfirmModerationSubscriptionPolicy) + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True, + pre_confirmed=True) + workflow.run_thru('confirmation_checks') + with patch.object(workflow, '_step_moderation_checks') as step: + next(workflow) + step.assert_called_once_with() + + def test_confirmation_checks_confirmation_needed(self): + # The subscription policy requires confirmation and the subscription + # is not pre-confirmed. + self._mlist.subscription_policy = ConfirmSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True) + workflow.run_thru('confirmation_checks') + with patch.object(workflow, '_step_send_confirmation') as step: + next(workflow) + step.assert_called_once_with() + + def test_confirmation_checks_moderate_confirmation_needed(self): + # The subscription policy requires confirmation and moderation, and the + # subscription is not pre-confirmed. + self._mlist.subscription_policy = ( + ConfirmModerationSubscriptionPolicy) + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True) + workflow.run_thru('confirmation_checks') + with patch.object(workflow, '_step_send_confirmation') as step: + next(workflow) + step.assert_called_once_with() + + def test_moderation_checks_pre_approved(self): + # The subscription is pre-approved by the moderator. + self._mlist.subscription_policy = ModerationSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True, + pre_approved=True) + workflow.run_thru('moderation_checks') + with patch.object(workflow, '_step_do_subscription') as step: + next(workflow) + step.assert_called_once_with() + + def test_moderation_checks_approval_required(self): + # The moderator must approve the subscription. + self._mlist.subscription_policy = ModerationSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True) + workflow.run_thru('moderation_checks') + with patch.object(workflow, '_step_get_moderator_approval') as step: + next(workflow) + step.assert_called_once_with() + + def test_do_subscription(self): + # An open subscription policy plus a pre-verified address means the + # user gets subscribed to the mailing list without any further + # confirmations or approvals. + self._mlist.subscription_policy = OpenSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True) + # Consume the entire state machine. + list(workflow) + # Anne is now a member of the mailing list. + member = self._mlist.regular_members.get_member(self._anne) + self.assertEqual(member.address, anne) + self.assertEqual(workflow.member, member) + # No further token is needed. + self.assertIsNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.no_one) + + def test_do_subscription_pre_approved(self): + # An moderation-requiring subscription policy plus a pre-verified and + # pre-approved address means the user gets subscribed to the mailing + # list without any further confirmations or approvals. + self._mlist.subscription_policy = ModerationSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True, + pre_approved=True) + # Consume the entire state machine. + list(workflow) + # Anne is now a member of the mailing list. + member = self._mlist.regular_members.get_member(self._anne) + self.assertEqual(member.address, anne) + self.assertEqual(workflow.member, member) + # No further token is needed. + self.assertIsNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.no_one) + + def test_do_subscription_pre_approved_pre_confirmed(self): + # An moderation-requiring subscription policy plus a pre-verified and + # pre-approved address means the user gets subscribed to the mailing + # list without any further confirmations or approvals. + self._mlist.subscription_policy = ( + ConfirmModerationSubscriptionPolicy) + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True, + pre_confirmed=True, + pre_approved=True) + # Consume the entire state machine. + list(workflow) + # Anne is now a member of the mailing list. + member = self._mlist.regular_members.get_member(self._anne) + self.assertEqual(member.address, anne) + self.assertEqual(workflow.member, member) + # No further token is needed. + self.assertIsNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.no_one) + + def test_do_subscription_cleanups(self): + # Once the user is subscribed, the token, and its associated pending + # database record will be removed from the database. + self._mlist.subscription_policy = OpenSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True) + # Consume the entire state machine. + list(workflow) + # Anne is now a member of the mailing list. + member = self._mlist.regular_members.get_member(self._anne) + self.assertEqual(member.address, anne) + self.assertEqual(workflow.member, member) + # The workflow is done, so it has no token. + self.assertIsNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.no_one) + + def test_moderator_approves(self): + # The workflow runs until moderator approval is required, at which + # point the workflow is saved. Once the moderator approves, the + # workflow resumes and the user is subscribed. + self._mlist.subscription_policy = ModerationSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True) + # Consume the entire state machine. + list(workflow) + # The user is not currently subscribed to the mailing list. + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNone(member) + self.assertIsNone(workflow.member) + # The token is owned by the moderator. + self.assertIsNotNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.moderator) + # Create a new workflow with the previous workflow's save token, and + # restore its state. This models an approved subscription and should + # result in the user getting subscribed. + approved_workflow = self._mlist.subscription_policy(self._mlist) + approved_workflow.token = workflow.token + approved_workflow.restore() + list(approved_workflow) + # Now the user is subscribed to the mailing list. + member = self._mlist.regular_members.get_member(self._anne) + self.assertEqual(member.address, anne) + self.assertEqual(approved_workflow.member, member) + # No further token is needed. + self.assertIsNone(approved_workflow.token) + self.assertEqual(approved_workflow.token_owner, TokenOwner.no_one) + + def test_get_moderator_approval_log_on_hold(self): + # When the subscription is held for moderator approval, a message is + # logged. + mark = LogFileMark('mailman.subscribe') + self._mlist.subscription_policy = ModerationSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True) + # Consume the entire state machine. + list(workflow) + self.assertIn( + 'test@example.com: held subscription request from anne@example.com', + mark.readline() + ) + # The state machine stopped at the moderator approval so there will be + # one token still in the database. + self._expected_pendings_count = 1 + + def test_get_moderator_approval_notifies_moderators(self): + # When the subscription is held for moderator approval, and the list + # is so configured, a notification is sent to the list moderators. + self._mlist.admin_immed_notify = True + self._mlist.subscription_policy = ModerationSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + bart = self._user_manager.create_user('bart@example.com', 'Bart User') + address = set_preferred(bart) + self._mlist.subscribe(address, MemberRole.moderator) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True) + # Consume the entire state machine. + list(workflow) + # Find the moderator message. + items = get_queue_messages('virgin', expected_count=1) + for item in items: + if item.msg['to'] == 'test-owner@example.com': + break + else: + raise AssertionError('No moderator email found') + self.assertEqual( + item.msgdata['recipients'], {'test-owner@example.com'}) + message = items[0].msg + self.assertEqual(message['From'], 'test-owner@example.com') + self.assertEqual(message['To'], 'test-owner@example.com') + self.assertEqual( + message['Subject'], + 'New subscription request to Test from anne@example.com') + self.assertEqual(message.get_payload(), """\ +Your authorization is required for a mailing list subscription request +approval: + + For: anne@example.com + List: test@example.com +""") + # The state machine stopped at the moderator approval so there will be + # one token still in the database. + self._expected_pendings_count = 1 + + def test_get_moderator_approval_no_notifications(self): + # When the subscription is held for moderator approval, and the list + # is so configured, a notification is sent to the list moderators. + self._mlist.admin_immed_notify = False + self._mlist.subscription_policy = ModerationSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True) + # Consume the entire state machine. + list(workflow) + get_queue_messages('virgin', expected_count=0) + # The state machine stopped at the moderator approval so there will be + # one token still in the database. + self._expected_pendings_count = 1 + + def test_send_confirmation(self): + # A confirmation message gets sent when the address is not verified. + anne = self._user_manager.create_address(self._anne) + self.assertIsNone(anne.verified_on) + # Run the workflow to model the confirmation step. + workflow = self._mlist.subscription_policy(self._mlist, anne) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + token = workflow.token + self.assertEqual(message['Subject'], 'confirm {}'.format(token)) + self.assertEqual( + message['From'], 'test-confirm+{}@example.com'.format(token)) + # The confirmation message is not `Precedence: bulk`. + self.assertIsNone(message['precedence']) + # The state machine stopped at the moderator approval so there will be + # one token still in the database. + self._expected_pendings_count = 1 + + def test_send_confirmation_pre_confirmed(self): + # A confirmation message gets sent when the address is not verified + # but the subscription is pre-confirmed. + anne = self._user_manager.create_address(self._anne) + self.assertIsNone(anne.verified_on) + # Run the workflow to model the confirmation step. + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_confirmed=True) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + token = workflow.token + self.assertEqual( + message['Subject'], 'confirm {}'.format(workflow.token)) + self.assertEqual( + message['From'], 'test-confirm+{}@example.com'.format(token)) + # The state machine stopped at the moderator approval so there will be + # one token still in the database. + self._expected_pendings_count = 1 + + def test_send_confirmation_pre_verified(self): + # A confirmation message gets sent even when the address is verified + # when the subscription must be confirmed. + self._mlist.subscription_policy = ConfirmSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + self.assertIsNone(anne.verified_on) + # Run the workflow to model the confirmation step. + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + token = workflow.token + self.assertEqual( + message['Subject'], 'confirm {}'.format(workflow.token)) + self.assertEqual( + message['From'], 'test-confirm+{}@example.com'.format(token)) + # The state machine stopped at the moderator approval so there will be + # one token still in the database. + self._expected_pendings_count = 1 + + def test_do_confirm_verify_address(self): + # The address is not yet verified, nor are we pre-verifying. A + # confirmation message will be sent. When the user confirms their + # subscription request, the address will end up being verified. + anne = self._user_manager.create_address(self._anne) + self.assertIsNone(anne.verified_on) + # Run the workflow to model the confirmation step. + workflow = self._mlist.subscription_policy(self._mlist, anne) + list(workflow) + # The address is still not verified. + self.assertIsNone(anne.verified_on) + confirm_workflow = self._mlist.subscription_policy(self._mlist) + confirm_workflow.token = workflow.token + confirm_workflow.restore() + confirm_workflow.run_thru('do_confirm_verify') + # The address is now verified. + self.assertIsNotNone(anne.verified_on) + + def test_do_confirm_verify_user(self): + # A confirmation step is necessary when a user subscribes with their + # preferred address, and we are not pre-confirming. + anne = self._user_manager.create_user(self._anne) + set_preferred(anne) + # Run the workflow to model the confirmation step. There is no + # subscriber attribute yet. + workflow = self._mlist.subscription_policy(self._mlist, anne) + list(workflow) + self.assertEqual(workflow.subscriber, anne) + # Do a confirmation workflow, which should now set the subscriber. + confirm_workflow = self._mlist.subscription_policy(self._mlist) + confirm_workflow.token = workflow.token + confirm_workflow.restore() + confirm_workflow.run_thru('do_confirm_verify') + # The address is now verified. + self.assertEqual(confirm_workflow.subscriber, anne) + + def test_do_confirmation_subscribes_user(self): + # Subscriptions to the mailing list must be confirmed. Once that's + # done, the user's address (which is not initially verified) gets + # subscribed to the mailing list. + self._mlist.subscription_policy = ConfirmSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + self.assertIsNone(anne.verified_on) + workflow = self._mlist.subscription_policy(self._mlist, anne) + list(workflow) + # Anne is not yet a member. + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNone(member) + self.assertIsNone(workflow.member) + # The token is owned by the subscriber. + self.assertIsNotNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.subscriber) + # Confirm. + confirm_workflow = self._mlist.subscription_policy(self._mlist) + confirm_workflow.token = workflow.token + confirm_workflow.restore() + list(confirm_workflow) + self.assertIsNotNone(anne.verified_on) + # Anne is now a member. + member = self._mlist.regular_members.get_member(self._anne) + self.assertEqual(member.address, anne) + self.assertEqual(confirm_workflow.member, member) + # No further token is needed. + self.assertIsNone(confirm_workflow.token) + self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one) + + def test_prevent_confirmation_replay_attacks(self): + # Ensure that if the workflow requires two confirmations, e.g. first + # the user confirming their subscription, and then the moderator + # approving it, that different tokens are used in these two cases. + self._mlist.subscription_policy = ( + ConfirmModerationSubscriptionPolicy) + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True) + # Run the state machine up to the first confirmation, and cache the + # confirmation token. + list(workflow) + token = workflow.token + # Anne is not yet a member of the mailing list. + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNone(member) + self.assertIsNone(workflow.member) + # The token is owned by the subscriber. + self.assertIsNotNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.subscriber) + # The old token will not work for moderator approval. + moderator_workflow = self._mlist.subscription_policy(self._mlist) + moderator_workflow.token = token + moderator_workflow.restore() + list(moderator_workflow) + # The token is owned by the moderator. + self.assertIsNotNone(moderator_workflow.token) + self.assertEqual(moderator_workflow.token_owner, TokenOwner.moderator) + # While we wait for the moderator to approve the subscription, note + # that there's a new token for the next steps. + self.assertNotEqual(token, moderator_workflow.token) + # The old token won't work. + final_workflow = self._mlist.subscription_policy(self._mlist) + final_workflow.token = token + self.assertRaises(LookupError, final_workflow.restore) + # Running this workflow will fail. + self.assertRaises(AssertionError, list, final_workflow) + # Anne is still not subscribed. + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNone(member) + self.assertIsNone(final_workflow.member) + # However, if we use the new token, her subscription request will be + # approved by the moderator. + final_workflow.token = moderator_workflow.token + final_workflow.restore() + list(final_workflow) + # And now Anne is a member. + member = self._mlist.regular_members.get_member(self._anne) + self.assertEqual(member.address.email, self._anne) + self.assertEqual(final_workflow.member, member) + # No further token is needed. + self.assertIsNone(final_workflow.token) + self.assertEqual(final_workflow.token_owner, TokenOwner.no_one) + + def test_confirmation_needed_and_pre_confirmed(self): + # The subscription policy is 'confirm' but the subscription is + # pre-confirmed so the moderation checks can be skipped. + self._mlist.subscription_policy = ConfirmSubscriptionPolicy + anne = self._user_manager.create_address(self._anne) + workflow = self._mlist.subscription_policy( + self._mlist, anne, + pre_verified=True, pre_confirmed=True) + list(workflow) + # Anne was subscribed. + self.assertIsNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.no_one) + self.assertEqual(workflow.member.address, anne) + + def test_restore_user_absorbed(self): + # The subscribing user is absorbed (and thus deleted) before the + # moderator approves the subscription. + self._mlist.subscription_policy = ModerationSubscriptionPolicy + anne = self._user_manager.create_user(self._anne) + bill = self._user_manager.create_user('bill@example.com') + set_preferred(bill) + # anne subscribes. + workflow = self._mlist.subscription_policy(self._mlist, anne, + pre_verified=True) + list(workflow) + # bill absorbs anne. + bill.absorb(anne) + # anne's subscription request is approved. + approved_workflow = self._mlist.subscription_policy(self._mlist) + approved_workflow.token = workflow.token + approved_workflow.restore() + self.assertEqual(approved_workflow.user, bill) + # Run the workflow through. + list(approved_workflow) + + def test_restore_address_absorbed(self): + # The subscribing user is absorbed (and thus deleted) before the + # moderator approves the subscription. + self._mlist.subscription_policy = ModerationSubscriptionPolicy + anne = self._user_manager.create_user(self._anne) + anne_address = anne.addresses[0] + bill = self._user_manager.create_user('bill@example.com') + # anne subscribes. + workflow = self._mlist.subscription_policy( + self._mlist, anne_address, pre_verified=True) + list(workflow) + # bill absorbs anne. + bill.absorb(anne) + self.assertIn(anne_address, bill.addresses) + # anne's subscription request is approved. + approved_workflow = self._mlist.subscription_policy(self._mlist) + approved_workflow.token = workflow.token + approved_workflow.restore() + self.assertEqual(approved_workflow.user, bill) + # Run the workflow through. + list(approved_workflow) diff --git a/src/mailman/workflows/tests/test_unsubscriptions.py b/src/mailman/workflows/tests/test_unsubscriptions.py new file mode 100644 index 000000000..2e210e90b --- /dev/null +++ b/src/mailman/workflows/tests/test_unsubscriptions.py @@ -0,0 +1,520 @@ +# Copyright (C) 2016-2017 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see . + +"""Test for unsubscription service.""" + +import unittest + +from contextlib import suppress +from mailman.app.lifecycle import create_list +from mailman.interfaces.pending import IPendings +from mailman.interfaces.subscriptions import TokenOwner +from mailman.interfaces.usermanager import IUserManager +from mailman.testing.helpers import LogFileMark, get_queue_messages +from mailman.testing.layers import ConfigLayer +from mailman.utilities.datetime import now +from mailman.workflows.unsubscription import ( + ConfirmModerationUnsubscriptionPolicy, ConfirmUnsubscriptionPolicy, + ModerationUnsubscriptionPolicy, OpenUnsubscriptionPolicy) +from unittest.mock import patch +from zope.component import getUtility + + +class TestUnSubscriptionWorkflow(unittest.TestCase): + layer = ConfigLayer + maxDiff = None + + def setUp(self): + self._mlist = create_list('test@example.com') + self._mlist.admin_immed_notify = False + self._mlist.unsubscription_policy = OpenUnsubscriptionPolicy + self._mlist.send_welcome_message = False + self._anne = 'anne@example.com' + self._user_manager = getUtility(IUserManager) + self.anne = self._user_manager.create_user(self._anne) + self.anne.addresses[0].verified_on = now() + self.anne.preferred_address = self.anne.addresses[0] + self._mlist.subscribe(self.anne) + self._expected_pendings_count = 0 + + def tearDown(self): + # There usually should be no pending after all is said and done, but + # some tests don't complete the workflow. + self.assertEqual(getUtility(IPendings).count, + self._expected_pendings_count) + + def test_start_state(self): + # Test the workflow starts with no tokens or members. + workflow = self._mlist.unsubscription_policy(self._mlist) + self.assertEqual(workflow.token_owner, TokenOwner.no_one) + self.assertIsNone(workflow.token) + self.assertIsNone(workflow.member) + + def test_pended_data(self): + # Test there is a Pendable object associated with a held + # unsubscription request and it has some valid data associated with + # it. + self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) + with suppress(StopIteration): + workflow.run_thru('send_confirmation') + self.assertIsNotNone(workflow.token) + pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) + self.assertEqual(pendable['list_id'], 'test.example.com') + self.assertEqual(pendable['email'], 'anne@example.com') + self.assertEqual(pendable['display_name'], '') + self.assertEqual(pendable['when'], '2005-08-01T07:49:23') + self.assertEqual(pendable['token_owner'], 'subscriber') + # The token is still in the database. + self._expected_pendings_count = 1 + + def test_user_or_address_required(self): + # The `subscriber` attribute must be a user or address that is provided + # to the workflow. + workflow = OpenUnsubscriptionPolicy(self._mlist) + self.assertRaises(AssertionError, list, workflow) + + def test_user_is_subscribed_to_unsubscribe(self): + # A user must be subscribed to a list when trying to unsubscribe. + addr = self._user_manager.create_address('aperson@example.org') + addr.verfied_on = now() + workflow = self._mlist.unsubscription_policy(self._mlist, addr) + self.assertRaises(AssertionError, + workflow.run_thru, 'subscription_checks') + + def test_confirmation_checks_open_list(self): + # An unsubscription from an open list does not need to be confirmed or + # moderated. + self._mlist.unsubscription_policy = OpenUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) + workflow.run_thru('subscription_checks') + with patch.object(workflow, '_step_do_unsubscription') as step: + next(workflow) + step.assert_called_once_with() + + def test_confirmation_checks_no_user_confirmation_needed(self): + # An unsubscription from a list which does not need user confirmation + # skips to the moderation checks. + self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) + workflow.run_thru('subscription_checks') + with patch.object(workflow, '_step_moderation_checks') as step: + next(workflow) + step.assert_called_once_with() + + def test_confirmation_checks_confirm_pre_confirmed(self): + # The unsubscription policy requires user-confirmation, but their + # unsubscription is pre-confirmed. Since moderation is not reuqired, + # the user will be immediately unsubscribed. + self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy( + self._mlist, self.anne, pre_confirmed=True) + workflow.run_thru('confirmation_checks') + with patch.object(workflow, '_step_do_unsubscription') as step: + next(workflow) + step.assert_called_once_with() + + def test_confirmation_checks_confirm_then_moderate_pre_confirmed(self): + # The unsubscription policy requires user confirmation, but their + # unsubscription is pre-confirmed. Since moderation is required, that + # check will be performed. + self._mlist.unsubscription_policy = ( + ConfirmModerationUnsubscriptionPolicy) + workflow = self._mlist.unsubscription_policy( + self._mlist, self.anne, pre_confirmed=True) + workflow.run_thru('confirmation_checks') + with patch.object(workflow, '_step_moderation_checks') as step: + next(workflow) + step.assert_called_once_with() + + def test_send_confirmation_checks_confirm_list(self): + # The unsubscription policy requires user confirmation and the + # unsubscription is not pre-confirmed. + self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) + workflow.run_thru('confirmation_checks') + with patch.object(workflow, '_step_send_confirmation') as step: + next(workflow) + step.assert_called_once_with() + + def test_moderation_checks_moderated_list(self): + # The unsubscription policy requires moderation. + self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) + workflow.run_thru('subscription_checks') + with patch.object(workflow, '_step_moderation_checks') as step: + next(workflow) + step.assert_called_once_with() + + def test_moderation_checks_approval_required(self): + # The moderator must approve the subscription request. + self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) + workflow.run_thru('moderation_checks') + with patch.object(workflow, '_step_get_moderator_approval') as step: + next(workflow) + step.assert_called_once_with() + + def test_do_unsusbcription(self): + # An open unsubscription policy means the user gets unsubscribed to + # the mailing list without any further confirmations or approvals. + self._mlist.unsubscription_policy = OpenUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) + list(workflow) + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNone(member) + + def test_do_unsubscription_pre_approved(self): + # A moderation-requiring subscription policy plus a pre-approved + # address means the user gets unsubscribed from the mailing list + # without any further confirmation or approvals. + self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy(self._mlist, self.anne, + pre_approved=True) + list(workflow) + # Anne is now unsubscribed form the mailing list. + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNone(member) + # No further token is needed. + self.assertIsNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.no_one) + + def test_do_unsubscription_pre_approved_pre_confirmed(self): + # A moderation-requiring unsubscription policy plus a pre-appvoed + # address means the user gets unsubscribed to the mailing list without + # any further confirmations or approvals. + self._mlist.unsubscription_policy = ( + ConfirmModerationUnsubscriptionPolicy) + workflow = self._mlist.unsubscription_policy(self._mlist, self.anne, + pre_approved=True, + pre_confirmed=True) + list(workflow) + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNone(member) + # No further token is needed. + self.assertIsNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.no_one) + + def test_do_unsubscription_cleanups(self): + # Once the user is unsubscribed, the token and its associated pending + # database record will be removed from the database. + self._mlist.unsubscription_policy = OpenUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) + # Run the workflow. + list(workflow) + # Anne is now unsubscribed from the list. + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNone(member) + # Workflow is done, so it has no token. + self.assertIsNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.no_one) + + def test_moderator_approves(self): + # The workflow runs until moderator approval is required, at which + # point the workflow is saved. Once the moderator approves, the + # workflow resumes and the user is unsubscribed. + self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy( + self._mlist, self.anne) + # Run the entire workflow. + list(workflow) + # The user is currently subscribed to the mailing list. + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNotNone(member) + self.assertIsNotNone(workflow.member) + # The token is owned by the moderator. + self.assertIsNotNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.moderator) + # Create a new workflow with the previous workflow's save token, and + # restore its state. This models an approved un-sunscription request + # and should result in the user getting subscribed. + approved_workflow = self._mlist.unsubscription_policy(self._mlist) + approved_workflow.token = workflow.token + approved_workflow.restore() + list(approved_workflow) + # Now the user is unsubscribed from the mailing list. + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNone(member) + self.assertEqual(approved_workflow.member, member) + # No further token is needed. + self.assertIsNone(approved_workflow.token) + self.assertEqual(approved_workflow.token_owner, TokenOwner.no_one) + + def test_get_moderator_approval_log_on_hold(self): + # When the unsubscription is held for moderator approval, a message is + # logged. + mark = LogFileMark('mailman.subscribe') + self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy( + self._mlist, self.anne) + # Run the entire workflow. + list(workflow) + self.assertIn( + 'test@example.com: held unsubscription request from anne@example.com', + mark.readline() + ) + # The state machine stopped at the moderator approval step so there + # will be one token still in the database. + self._expected_pendings_count = 1 + + def test_get_moderator_approval_notifies_moderators(self): + # When the unsubscription is held for moderator approval, and the list + # is so configured, a notification is sent to the list moderators. + self._mlist.admin_immed_notify = True + self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy( + self._mlist, self.anne) + # Consume the entire state machine. + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + self.assertEqual(message['From'], 'test-owner@example.com') + self.assertEqual(message['To'], 'test-owner@example.com') + self.assertEqual( + message['Subject'], + 'New unsubscription request to Test from anne@example.com') + self.assertEqual(message.get_payload(), """\ +Your authorization is required for a mailing list unsubscription +request approval: + + For: anne@example.com + List: test@example.com +""") + # The state machine stopped at the moderator approval so there will be + # one token still in the database. + self._expected_pendings_count = 1 + + def test_get_moderator_approval_no_notifications(self): + # When the unsubscription request is held for moderator approval, and + # the list is so configured, a notification is sent to the list + # moderators. + self._mlist.admin_immed_notify = False + self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy( + self._mlist, self.anne) + # Consume the entire state machine. + list(workflow) + get_queue_messages('virgin', expected_count=0) + # The state machine stopped at the moderator approval so there will be + # one token still in the database. + self._expected_pendings_count = 1 + + def test_send_confirmation(self): + # A confirmation message gets sent when the unsubscription must be + # confirmed. + self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy + # Run the workflow to model the confirmation step. + workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + token = workflow.token + self.assertEqual( + message['Subject'], 'confirm {}'.format(workflow.token)) + self.assertEqual( + message['From'], 'test-confirm+{}@example.com'.format(token)) + # The state machine stopped at the member confirmation step so there + # will be one token still in the database. + self._expected_pendings_count = 1 + + def test_do_confirmation_unsubscribes_user(self): + # Unsubscriptions to the mailing list must be confirmed. Once that's + # done, the user's address is unsubscribed. + self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) + list(workflow) + # Anne is a member. + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNotNone(member) + self.assertEqual(member, workflow.member) + # The token is owned by the subscriber. + self.assertIsNotNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.subscriber) + # Confirm. + confirm_workflow = self._mlist.unsubscription_policy(self._mlist) + confirm_workflow.token = workflow.token + confirm_workflow.restore() + list(confirm_workflow) + # Anne is now unsubscribed. + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNone(member) + # No further token is needed. + self.assertIsNone(confirm_workflow.token) + self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one) + + def test_do_confirmation_unsubscribes_address(self): + # Unsubscriptions to the mailing list must be confirmed. Once that's + # done, the address is unsubscribed. + address = self.anne.register('anne.person@example.com') + self._mlist.subscribe(address) + self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy(self._mlist, address) + list(workflow) + # Bart is a member. + member = self._mlist.regular_members.get_member( + 'anne.person@example.com') + self.assertIsNotNone(member) + self.assertEqual(member, workflow.member) + # The token is owned by the subscriber. + self.assertIsNotNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.subscriber) + # Confirm. + confirm_workflow = self._mlist.unsubscription_policy(self._mlist) + confirm_workflow.token = workflow.token + confirm_workflow.restore() + list(confirm_workflow) + # Bart is now unsubscribed. + member = self._mlist.regular_members.get_member( + 'anne.person@example.com') + self.assertIsNone(member) + # No further token is needed. + self.assertIsNone(confirm_workflow.token) + self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one) + + def test_do_confirmation_nonmember(self): + # Attempt to confirm the unsubscription of a member who has already + # been unsubscribed. + self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) + list(workflow) + # Anne is a member. + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNotNone(member) + self.assertEqual(member, workflow.member) + # The token is owned by the subscriber. + self.assertIsNotNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.subscriber) + # Unsubscribe Anne out of band. + member.unsubscribe() + # Confirm. + confirm_workflow = self._mlist.unsubscription_policy(self._mlist) + confirm_workflow.token = workflow.token + confirm_workflow.restore() + list(confirm_workflow) + # No further token is needed. + self.assertIsNone(confirm_workflow.token) + self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one) + + def test_do_confirmation_nonmember_final_step(self): + # Attempt to confirm the unsubscription of a member who has already + # been unsubscribed. + self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) + list(workflow) + # Anne is a member. + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNotNone(member) + self.assertEqual(member, workflow.member) + # The token is owned by the subscriber. + self.assertIsNotNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.subscriber) + # Confirm. + confirm_workflow = self._mlist.unsubscription_policy(self._mlist) + confirm_workflow.token = workflow.token + confirm_workflow.restore() + confirm_workflow.run_until('do_unsubscription') + self.assertEqual(member, confirm_workflow.member) + # Unsubscribe Anne out of band. + member.unsubscribe() + list(confirm_workflow) + self.assertIsNone(confirm_workflow.member) + # No further token is needed. + self.assertIsNone(confirm_workflow.token) + self.assertEqual(confirm_workflow.token_owner, TokenOwner.no_one) + + def test_prevent_confirmation_replay_attacks(self): + # Ensure that if the workflow requires two confirmations, e.g. first + # the user confirming their subscription, and then the moderator + # approving it, that different tokens are used in these two cases. + self._mlist.unsubscription_policy = ( + ConfirmModerationUnsubscriptionPolicy) + workflow = self._mlist.unsubscription_policy(self._mlist, self.anne) + # Run the state machine up to the first confirmation, and cache the + # confirmation token. + list(workflow) + token = workflow.token + # Anne is still a member of the mailing list. + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNotNone(member) + self.assertIsNotNone(workflow.member) + # The token is owned by the subscriber. + self.assertIsNotNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.subscriber) + # The old token will not work for moderator approval. + moderator_workflow = self._mlist.unsubscription_policy(self._mlist) + moderator_workflow.token = token + moderator_workflow.restore() + list(moderator_workflow) + # The token is owned by the moderator. + self.assertIsNotNone(moderator_workflow.token) + self.assertEqual(moderator_workflow.token_owner, TokenOwner.moderator) + # While we wait for the moderator to approve the subscription, note + # that there's a new token for the next steps. + self.assertNotEqual(token, moderator_workflow.token) + # The old token won't work. + final_workflow = self._mlist.unsubscription_policy(self._mlist) + final_workflow.token = token + self.assertRaises(LookupError, final_workflow.restore) + # Running this workflow will fail. + self.assertRaises(AssertionError, list, final_workflow) + # Anne is still not unsubscribed. + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNotNone(member) + self.assertIsNone(final_workflow.member) + # However, if we use the new token, her unsubscription request will be + # approved by the moderator. + final_workflow.token = moderator_workflow.token + final_workflow.restore() + list(final_workflow) + # And now Anne is unsubscribed. + member = self._mlist.regular_members.get_member(self._anne) + self.assertIsNone(member) + # No further token is needed. + self.assertIsNone(final_workflow.token) + self.assertEqual(final_workflow.token_owner, TokenOwner.no_one) + + def test_confirmation_needed_and_pre_confirmed(self): + # The subscription policy is 'confirm' but the subscription is + # pre-confirmed so the moderation checks can be skipped. + self._mlist.unsubscription_policy = ConfirmUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy( + self._mlist, self.anne, pre_confirmed=True) + list(workflow) + # Anne was unsubscribed. + self.assertIsNone(workflow.token) + self.assertEqual(workflow.token_owner, TokenOwner.no_one) + self.assertIsNone(workflow.member) + + def test_confirmation_needed_moderator_address(self): + address = self.anne.register('anne.person@example.com') + self._mlist.subscribe(address) + self._mlist.unsubscription_policy = ModerationUnsubscriptionPolicy + workflow = self._mlist.unsubscription_policy(self._mlist, address) + # Get moderator approval. + list(workflow) + approved_workflow = self._mlist.unsubscription_policy(self._mlist) + approved_workflow.token = workflow.token + approved_workflow.restore() + list(approved_workflow) + self.assertEqual(approved_workflow.subscriber, address) + # Anne was unsubscribed. + self.assertIsNone(approved_workflow.token) + self.assertEqual(approved_workflow.token_owner, TokenOwner.no_one) + self.assertIsNone(approved_workflow.member) + member = self._mlist.regular_members.get_member( + 'anne.person@example.com') + self.assertIsNone(member) diff --git a/src/mailman/workflows/tests/test_workflow.py b/src/mailman/workflows/tests/test_workflow.py new file mode 100644 index 000000000..3e7856b29 --- /dev/null +++ b/src/mailman/workflows/tests/test_workflow.py @@ -0,0 +1,183 @@ +# Copyright (C) 2015-2017 by the Free Software Foundation, Inc. +# +# This file is part of GNU Mailman. +# +# GNU Mailman is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# GNU Mailman. If not, see . + +"""App-level workflow tests.""" + +import json +import unittest + +from mailman.interfaces.workflows import IWorkflowStateManager +from mailman.testing.layers import ConfigLayer +from mailman.workflows.base import Workflow +from zope.component import getUtility + + +class MyWorkflow(Workflow): + initial_state = 'first' + save_attributes = ('ant', 'bee', 'cat') + + def __init__(self): + super().__init__() + self.token = 'test-workflow' + self.ant = 1 + self.bee = 2 + self.cat = 3 + self.dog = 4 + + def _step_first(self): + self.push('second') + return 'one' + + def _step_second(self): + self.push('third') + return 'two' + + def _step_third(self): + return 'three' + + +class DependentWorkflow(MyWorkflow): + save_attributes = ('ant', 'bee', 'cat', 'elf') + + def __init__(self): + super().__init__() + self._elf = 5 + + @property + def elf(self): + return self._elf + + @elf.setter + def elf(self, value): + # This attribute depends on other attributes. + assert self.ant is not None + assert self.bee is not None + assert self.cat is not None + self._elf = value + + +class TestWorkflow(unittest.TestCase): + layer = ConfigLayer + + def setUp(self): + self._workflow = iter(MyWorkflow()) + + def test_basic_workflow(self): + # The work flows from one state to the next. + results = list(self._workflow) + self.assertEqual(results, ['one', 'two', 'three']) + + def test_partial_workflow(self): + # You don't have to flow through every step. + results = next(self._workflow) + self.assertEqual(results, 'one') + + def test_exhaust_workflow(self): + # Manually flow through a few steps, then consume the whole thing. + results = [next(self._workflow)] + results.extend(self._workflow) + self.assertEqual(results, ['one', 'two', 'three']) + + def test_save_and_restore_workflow(self): + # Without running any steps, save and restore the workflow. Then + # consume the restored workflow. + self._workflow.save() + new_workflow = MyWorkflow() + new_workflow.restore() + results = list(new_workflow) + self.assertEqual(results, ['one', 'two', 'three']) + + def test_save_and_restore_partial_workflow(self): + # After running a few steps, save and restore the workflow. Then + # consume the restored workflow. + next(self._workflow) + self._workflow.save() + new_workflow = MyWorkflow() + new_workflow.restore() + results = list(new_workflow) + self.assertEqual(results, ['two', 'three']) + + def test_save_and_restore_exhausted_workflow(self): + # After consuming the entire workflow, save and restore it. + list(self._workflow) + self._workflow.save() + new_workflow = MyWorkflow() + new_workflow.restore() + results = list(new_workflow) + self.assertEqual(len(results), 0) + + def test_save_and_restore_attributes(self): + # Saved attributes are restored. + self._workflow.ant = 9 + self._workflow.bee = 8 + self._workflow.cat = 7 + # Don't save .dog. + self._workflow.save() + new_workflow = MyWorkflow() + new_workflow.restore() + self.assertEqual(new_workflow.ant, 9) + self.assertEqual(new_workflow.bee, 8) + self.assertEqual(new_workflow.cat, 7) + self.assertEqual(new_workflow.dog, 4) + + def test_save_and_restore_dependant_attributes(self): + # Attributes must be restored in the order they are declared in + # save_attributes. + workflow = iter(DependentWorkflow()) + workflow.elf = 6 + workflow.save() + new_workflow = DependentWorkflow() + # The elf attribute must be restored last, set triggering values for + # attributes it depends on. + new_workflow.ant = new_workflow.bee = new_workflow.cat = None + new_workflow.restore() + self.assertEqual(new_workflow.elf, 6) + + def test_save_and_restore_obsolete_attributes(self): + # Obsolete saved attributes are ignored. + state_manager = getUtility(IWorkflowStateManager) + # Save the state of an old version of the workflow that would not have + # the cat attribute. + state_manager.save( + self._workflow.token, '["first"]', + json.dumps({'ant': 1, 'bee': 2})) + # Restore in the current version that needs the cat attribute. + new_workflow = MyWorkflow() + try: + new_workflow.restore() + except KeyError: + self.fail('Restore does not handle obsolete attributes') + # Restoring must not raise an exception, the default value is kept. + self.assertEqual(new_workflow.cat, 3) + + def test_run_thru(self): + # Run all steps through the given one. + results = self._workflow.run_thru('second') + self.assertEqual(results, ['one', 'two']) + + def test_run_thru_completes(self): + results = self._workflow.run_thru('all of them') + self.assertEqual(results, ['one', 'two', 'three']) + + def test_run_until(self): + # Run until (but not including) the given step. + results = self._workflow.run_until('second') + self.assertEqual(results, ['one']) + + def test_run_until_completes(self): + results = self._workflow.run_until('all of them') + self.assertEqual(results, ['one', 'two', 'three']) -- cgit v1.2.3-70-g09d2