diff options
| -rw-r--r-- | src/mailman_pgp/commands/tests/test_key.py | 65 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/base.py | 3 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/tests/test_base.py | 130 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/tests/test_subscription.py | 2 |
4 files changed, 105 insertions, 95 deletions
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index 4f62b11..44d5b25 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -26,6 +26,7 @@ from mailman.interfaces.usermanager import IUserManager from mailman.runners.command import CommandRunner from mailman.testing.helpers import get_queue_messages, make_testable_runner from mailman.utilities.datetime import now +from public import public from zope.component import getUtility from mailman_pgp.config import mm_config @@ -59,6 +60,7 @@ def _create_mixed(from_hdr, to_hdr, subject_hdr): return message +@public class TestPreDispatch(unittest.TestCase): layer = PGPConfigLayer @@ -103,6 +105,7 @@ class TestPreDispatch(unittest.TestCase): results_msg.get_payload()) +@public class TestPreSubscription(unittest.TestCase): layer = PGPConfigLayer @@ -111,6 +114,9 @@ class TestPreSubscription(unittest.TestCase): self.pgp_list = PGPMailingList.for_list(self.mlist) self.pgp_list.key = load_key('ecc_p256.priv.asc') + self.bart_key = load_key('rsa_1024.priv.asc') + self.anne_key = load_key('ecc_p256.priv.asc') + def test_set(self): self.mlist.subscription_policy = OpenSubscriptionPolicy bart = getUtility(IUserManager).create_address('bart@example.com', @@ -121,12 +127,10 @@ class TestPreSubscription(unittest.TestCase): get_queue_messages('virgin') - bart_key = load_key('rsa_1024.priv.asc') - set_message = _create_mixed('bart@example.com', 'test@example.com', 'Re: key set {}'.format(token)) wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(bart_key.pubkey) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -134,7 +138,8 @@ class TestPreSubscription(unittest.TestCase): pgp_address = PGPAddress.for_address(bart) self.assertIsNotNone(pgp_address) - self.assertEqual(pgp_address.key.fingerprint, bart_key.fingerprint) + self.assertEqual(pgp_address.key.fingerprint, + self.bart_key.fingerprint) self.assertFalse(pgp_address.key_confirmed) items = get_queue_messages('virgin', expected_count=2) @@ -146,7 +151,7 @@ class TestPreSubscription(unittest.TestCase): confirm_request = items[0].msg self.assertIn('Key succesfully set.', results.get_payload()) - self.assertIn('Key fingerprint: {}'.format(bart_key.fingerprint), + self.assertIn('Key fingerprint: {}'.format(self.bart_key.fingerprint), results.get_payload()) confirm_wrapped = PGPWrapper(confirm_request) @@ -178,15 +183,12 @@ class TestPreSubscription(unittest.TestCase): results_msg.get_payload()) def test_set_multiple_keys(self): - bart_key = load_key('rsa_1024.priv.asc') - anne_key = load_key('ecc_p256.priv.asc') - set_message = _create_mixed('bart@example.com', 'test@example.com', 'Re: key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(bart_key.pubkey) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(anne_key.pubkey) + set_message = wrapped_set_message.attach_key(self.anne_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -198,11 +200,9 @@ class TestPreSubscription(unittest.TestCase): results_msg.get_payload()) def test_set_no_email(self): - bart_key = load_key('rsa_1024.priv.asc') - message = _create_mixed('', 'test@example.com', 'key set token') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(bart_key.pubkey) + message = wrapped_message.attach_key(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -213,12 +213,10 @@ class TestPreSubscription(unittest.TestCase): self.assertIn('No email to subscribe with.', results_msg.get_payload()) def test_set_no_address(self): - bart_key = load_key('rsa_1024.priv.asc') - set_message = _create_mixed('bart@example.com', 'test@example.com', 'key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(bart_key.pubkey) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -235,19 +233,18 @@ class TestPreSubscription(unittest.TestCase): 'Bart Person') bart.verified_on = now() - bart_key = load_key('rsa_1024.priv.asc') - token, token_owner, member = ISubscriptionManager(self.mlist).register( - bart, pubkey=bart_key.pubkey) + bart, pubkey=self.bart_key.pubkey) get_queue_messages('virgin') message = _create_plain('bart@example.com', 'test@example.com', 'Re: key confirm {}'.format(token), - CONFIRM_REQUEST.format(bart_key.fingerprint, - token)) + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + token)) wrapped_message = MIMEWrapper(message) - message = wrapped_message.sign(bart_key) + message = wrapped_message.sign(self.bart_key) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -263,20 +260,20 @@ class TestPreSubscription(unittest.TestCase): 'Bart Person') bart.verified_on = now() - bart_key = load_key('rsa_1024.priv.asc') - token, token_owner, member = ISubscriptionManager(self.mlist).register( - bart, pubkey=bart_key.pubkey) + bart, pubkey=self.bart_key.pubkey) get_queue_messages('virgin') message = _create_plain('bart@example.com', 'test@example.com', 'Re: key confirm {}'.format(token), - CONFIRM_REQUEST.format(bart_key.fingerprint, - token)) + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + token)) wrapped_message = MIMEWrapper(message) - message = wrapped_message.sign_encrypt(bart_key, self.pgp_list.pubkey, - bart_key.pubkey) + message = wrapped_message.sign_encrypt(self.bart_key, + self.pgp_list.pubkey, + self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -330,17 +327,16 @@ class TestPreSubscription(unittest.TestCase): 'Bart Person') bart.verified_on = now() - bart_key = load_key('rsa_1024.priv.asc') - token, token_owner, member = ISubscriptionManager(self.mlist).register( - bart, pubkey=bart_key.pubkey) + bart, pubkey=self.bart_key.pubkey) get_queue_messages('virgin') message = _create_plain('bart@example.com', 'test@example.com', 'Re: key confirm {}'.format(token), - CONFIRM_REQUEST.format(bart_key.fingerprint, - token)) + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + token)) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -353,6 +349,7 @@ class TestPreSubscription(unittest.TestCase): results_msg.get_payload()) +@public class TestAfterSubscription(unittest.TestCase): layer = PGPConfigLayer diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index d05781d..014f2dd 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -55,9 +55,8 @@ class PGPMixin: class SetPubkeyMixin: - def __init__(self, pubkey=None, pre_confirmed=False): + def __init__(self, pubkey=None): self.pubkey = pubkey - self.pubkey_confirmed = pre_confirmed @property def pubkey_key(self): diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py index af00d01..c59cfe0 100644 --- a/src/mailman_pgp/workflows/tests/test_base.py +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -24,8 +24,12 @@ from unittest.mock import patch from mailman.app.lifecycle import create_list from mailman.interfaces.pending import IPendings from mailman.interfaces.usermanager import IUserManager +from mailman.interfaces.workflows import IWorkflow from mailman.testing.helpers import get_queue_messages +from mailman.workflows.common import SubscriptionBase +from public import public from zope.component import getUtility +from zope.interface import implementer from mailman_pgp.database import mm_transaction, transaction from mailman_pgp.model.address import PGPAddress @@ -33,11 +37,11 @@ from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.tests.base import load_key from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.testing.layers import PGPConfigLayer -from mailman_pgp.workflows.base import KEY_REQUEST -from mailman_pgp.workflows.subscription import ConfirmSubscriptionPolicy +from mailman_pgp.workflows.base import (KEY_REQUEST, PGPMixin, SetPubkeyMixin, + ConfirmPubkeyMixin) -class PubkeyMixinSetup(): +class PubkeyMixinTestSetup(): def setUp(self): with mm_transaction(): self.mlist = create_list('test@example.com', @@ -54,34 +58,62 @@ class PubkeyMixinSetup(): self.sender = self.um.create_address('rsa-1024b@example.org') -class TestPGPMixin(PubkeyMixinSetup, unittest.TestCase): +@implementer(IWorkflow) +class PGPTestWorkflow(SubscriptionBase, PGPMixin, SetPubkeyMixin, + ConfirmPubkeyMixin): + name = 'test-workflow' + description = '' + initial_state = 'prepare' + save_attributes = ( + 'pubkey_key', + 'pubkey_confirmed', + 'address_key', + 'subscriber_key', + 'user_key', + 'token_owner_key' + ) + + def __init__(self, mlist, subscriber=None, *, pubkey=None, + pubkey_pre_confirmed=False): + SubscriptionBase.__init__(self, mlist, subscriber) + SetPubkeyMixin.__init__(self, pubkey=pubkey) + ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) + PGPMixin.__init__(self) + + def _step_prepare(self): + self.push('do_subscription') + self.push('pubkey_confirmation') + self.push('pubkey_checks') + self.push('pgp_prepare') + self.push('sanity_checks') + + +@public +class TestPGPMixin(PubkeyMixinTestSetup, unittest.TestCase): layer = PGPConfigLayer def test_create_address(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) workflow.run_thru('pgp_prepare') pgp_address = PGPAddress.for_address(self.sender) self.assertIsNotNone(pgp_address) def test_address_existing(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) with transaction() as t: pgp_address = PGPAddress(self.sender) t.add(pgp_address) workflow.run_thru('pgp_prepare') + still = PGPAddress.for_address(self.sender) + self.assertIsNotNone(still) -class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): +@public +class TestSetPubkeyMixin(PubkeyMixinTestSetup, unittest.TestCase): layer = PGPConfigLayer def test_key_request_sent(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) list(workflow) items = get_queue_messages('virgin', expected_count=1) message = items[0].msg @@ -91,24 +123,20 @@ class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): self.assertEqual(message.get_payload(), KEY_REQUEST) def test_receive_key(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) list(workflow) with transaction(): pgp_address = PGPAddress.for_address(self.sender) pgp_address.key = self.sender_key.pubkey - receive_workflow = ConfirmSubscriptionPolicy(self.mlist) + receive_workflow = PGPTestWorkflow(self.mlist) receive_workflow.token = workflow.token receive_workflow.restore() receive_workflow.run_thru('receive_key') def test_set_pubkey(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True, - pubkey=self.sender_key.pubkey) + workflow = PGPTestWorkflow(self.mlist, self.sender, + pubkey=self.sender_key.pubkey) workflow.run_thru('pubkey_checks') pgp_address = PGPAddress.for_address(self.sender) self.assertIsNotNone(pgp_address) @@ -117,9 +145,7 @@ class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): self.sender_key.fingerprint) def test_pubkey_set(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) with transaction() as t: pgp_address = PGPAddress(self.sender) pgp_address.key = self.sender_key.pubkey @@ -129,26 +155,23 @@ class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): self.sender_key.fingerprint) -class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): +@public +class TestConfirmPubkeyMixin(PubkeyMixinTestSetup, unittest.TestCase): layer = PGPConfigLayer def test_key_request_pubkey_set(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True, - pubkey=self.sender_key.pubkey, - pubkey_pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=True) workflow.run_thru('pubkey_confirmation') with patch.object(workflow, '_step_do_subscription') as step: next(workflow) step.assert_called_once_with() def test_send_key_confirm_request(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True, - pubkey=self.sender_key.pubkey, - pubkey_pre_confirmed=False) + workflow = PGPTestWorkflow(self.mlist, self.sender, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=False) list(workflow) items = get_queue_messages('virgin', expected_count=1) message = items[0].msg @@ -159,14 +182,12 @@ class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): self.assertTrue(wrapped.is_encrypted()) def test_receive_confirmation(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True, - pubkey=self.sender_key.pubkey, - pubkey_pre_confirmed=False) + workflow = PGPTestWorkflow(self.mlist, self.sender, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=False) list(workflow) - receive_workflow = ConfirmSubscriptionPolicy(self.mlist) + receive_workflow = PGPTestWorkflow(self.mlist) receive_workflow.token = workflow.token receive_workflow.restore() receive_workflow.run_thru('receive_key_confirmation') @@ -175,13 +196,12 @@ class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase): step.assert_called_once_with() -class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase): +@public +class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase): layer = PGPConfigLayer def test_pended_data_key_request(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) with suppress(StopIteration): workflow.run_thru('send_key_request') self.assertIsNotNone(workflow.token) @@ -193,10 +213,8 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase): self.assertEqual(pendable['token_owner'], 'subscriber') def test_pended_data_key_confirmation(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True, - pubkey=self.sender_key.pubkey) + workflow = PGPTestWorkflow(self.mlist, self.sender, + pubkey=self.sender_key.pubkey) with suppress(StopIteration): workflow.run_thru('send_key_confirm_request') self.assertIsNotNone(workflow.token) @@ -208,9 +226,7 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase): self.assertEqual(pendable['token_owner'], 'subscriber') def test_exisitng_pgp_address(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) with transaction() as t: pgp_address = PGPAddress(self.sender) @@ -224,9 +240,7 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase): step.assert_called_once_with() def test_exisitng_pgp_address_not_confirmed(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) with transaction() as t: pgp_address = PGPAddress(self.sender) @@ -239,9 +253,7 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase): step.assert_called_once_with() def test_exisitng_pgp_address_no_key(self): - workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, - pre_verified=True, - pre_confirmed=True) + workflow = PGPTestWorkflow(self.mlist, self.sender) with transaction() as t: pgp_address = PGPAddress(self.sender) diff --git a/src/mailman_pgp/workflows/tests/test_subscription.py b/src/mailman_pgp/workflows/tests/test_subscription.py index 9464a83..f9fa1e1 100644 --- a/src/mailman_pgp/workflows/tests/test_subscription.py +++ b/src/mailman_pgp/workflows/tests/test_subscription.py @@ -20,6 +20,7 @@ import unittest from mailman.app.lifecycle import create_list from mailman.interfaces.usermanager import IUserManager +from public import public from zope.component import getUtility from mailman_pgp.database import mm_transaction @@ -29,6 +30,7 @@ from mailman_pgp.workflows.subscription import ( ModerationSubscriptionPolicy, OpenSubscriptionPolicy) +@public class TestSubscriptionWorkflows(unittest.TestCase): layer = PGPConfigLayer |
