diff options
Diffstat (limited to 'src/mailman_pgp/workflows/tests/test_base.py')
| -rw-r--r-- | src/mailman_pgp/workflows/tests/test_base.py | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py index 8b6b4d1..19b2e58 100644 --- a/src/mailman_pgp/workflows/tests/test_base.py +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -14,3 +14,177 @@ # # You should have received a copy of the GNU General Public License along with # this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" + +import unittest +from contextlib import suppress +from unittest.mock import patch + +from mailman.app.lifecycle import create_list +from mailman.interfaces.pending import IPendings +from mailman.interfaces.usermanager import IUserManager +from mailman.testing.helpers import get_queue_messages +from zope.component import getUtility + +from mailman_pgp.config import config +from mailman_pgp.database import mm_transaction, transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.tests.base import load_key +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.workflows.base import KEY_REQUEST +from mailman_pgp.workflows.subscription import ConfirmSubscriptionPolicy + + +class TestPubkeyMixin(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + + self.list_key = load_key('ecc_p256.priv.asc') + + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = self.list_key + + self.um = getUtility(IUserManager) + + self.sender_key = load_key('rsa_1024.priv.asc') + self.sender = self.um.create_address('rsa-1024b@example.org') + + def test_pended_data(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True) + with suppress(StopIteration): + workflow.run_thru('send_key_request') + self.assertIsNotNone(workflow.token) + pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) + self.assertEqual(pendable['list_id'], 'test.example.com') + self.assertEqual(pendable['email'], 'rsa-1024b@example.org') + self.assertEqual(pendable['display_name'], '') + self.assertEqual(pendable['when'], '2005-08-01T07:49:23') + self.assertEqual(pendable['token_owner'], 'subscriber') + + def test_key_request_sent(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + token = workflow.token + + self.assertEqual(message['Subject'], 'key set {}'.format(token)) + self.assertEqual(message.get_payload(), KEY_REQUEST) + + def test_key_request_pubkey_set(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=True) + workflow.run_thru('pubkey_checks') + with patch.object(workflow, '_step_do_subscription') as step: + next(workflow) + step.assert_called_once_with() + + def test_receive_key(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True) + list(workflow) + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + t.add(pgp_address) + + receive_workflow = ConfirmSubscriptionPolicy(self.mlist) + receive_workflow.token = workflow.token + receive_workflow.restore() + receive_workflow.run_thru('receive_key') + + self.assertIsNotNone(receive_workflow.pubkey) + self.assertEqual(receive_workflow.pubkey.fingerprint, + self.sender_key.pubkey.fingerprint) + + with patch.object(receive_workflow, + '_step_send_confirm_request') as step: + next(receive_workflow) + step.assert_called_once_with() + + def test_receive_key_no_address(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True) + list(workflow) + receive_workflow = ConfirmSubscriptionPolicy(self.mlist) + receive_workflow.token = workflow.token + receive_workflow.restore() + receive_workflow.run_thru('receive_key') + + self.assertIsNone(receive_workflow.pubkey) + with patch.object(receive_workflow, + '_step_send_key_request') as step: + next(receive_workflow) + step.assert_called_once_with() + + def test_receive_key_pubkey_confirmed(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True, + pubkey_pre_confirmed=True) + list(workflow) + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + t.add(pgp_address) + + receive_workflow = ConfirmSubscriptionPolicy(self.mlist) + receive_workflow.token = workflow.token + receive_workflow.restore() + receive_workflow.run_thru('receive_key') + with patch.object(receive_workflow, '_step_do_subscription') as step: + next(receive_workflow) + step.assert_called_once_with() + + def test_send_key_confirm_request(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=False) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + token = workflow.token + + self.assertEqual(message['Subject'], 'key confirm {}'.format(token)) + wrapped = PGPWrapper(message) + self.assertTrue(wrapped.is_encrypted()) + + def test_receive_confirmation(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender, + pre_verified=True, + pre_confirmed=True, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=False) + list(workflow) + + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + t.add(pgp_address) + + receive_workflow = ConfirmSubscriptionPolicy(self.mlist) + receive_workflow.token = workflow.token + receive_workflow.restore() + receive_workflow.run_thru('receive_confirmation') + + with patch.object(receive_workflow, '_step_do_subscription') as step: + next(receive_workflow) + step.assert_called_once_with() |
