aboutsummaryrefslogtreecommitdiff
path: root/src/mailman_pgp/workflows
diff options
context:
space:
mode:
authorJ08nY2017-07-11 00:36:57 +0200
committerJ08nY2017-07-11 00:36:57 +0200
commitf8793bcc2e00c4ee639e4f0e75cfc2b19ea1849f (patch)
treee18603fb641ec66c021168d914f49ae4599095f6 /src/mailman_pgp/workflows
parented5fc4825fd56c4ec9b3e4c07554977363c55bc7 (diff)
downloadmailman-pgp-f8793bcc2e00c4ee639e4f0e75cfc2b19ea1849f.tar.gz
mailman-pgp-f8793bcc2e00c4ee639e4f0e75cfc2b19ea1849f.tar.zst
mailman-pgp-f8793bcc2e00c4ee639e4f0e75cfc2b19ea1849f.zip
Diffstat (limited to 'src/mailman_pgp/workflows')
-rw-r--r--src/mailman_pgp/workflows/base.py3
-rw-r--r--src/mailman_pgp/workflows/tests/test_base.py174
-rw-r--r--src/mailman_pgp/workflows/tests/test_subscription.py36
3 files changed, 211 insertions, 2 deletions
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index 3f4ba8d..52dd879 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -109,11 +109,10 @@ class PubkeyMixin:
encrypted = wrapped.sign_encrypt(pgp_list.key, self.pubkey,
pgp_list.pubkey)
- # XXX: This is not good:
msg.set_payload(encrypted.get_payload())
copy_headers(encrypted, msg, True)
msg.send(self.mlist)
raise StopIteration
def _step_receive_confirmation(self):
- pass
+ pass \ No newline at end of file
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
index 8b6b4d1..19b2e58 100644
--- a/src/mailman_pgp/workflows/tests/test_base.py
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -14,3 +14,177 @@
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+
+import unittest
+from contextlib import suppress
+from unittest.mock import patch
+
+from mailman.app.lifecycle import create_list
+from mailman.interfaces.pending import IPendings
+from mailman.interfaces.usermanager import IUserManager
+from mailman.testing.helpers import get_queue_messages
+from zope.component import getUtility
+
+from mailman_pgp.config import config
+from mailman_pgp.database import mm_transaction, transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.tests.base import load_key
+from mailman_pgp.pgp.wrapper import PGPWrapper
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.workflows.base import KEY_REQUEST
+from mailman_pgp.workflows.subscription import ConfirmSubscriptionPolicy
+
+
+class TestPubkeyMixin(unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ with mm_transaction():
+ self.mlist = create_list('test@example.com',
+ style_name='pgp-default')
+
+ self.list_key = load_key('ecc_p256.priv.asc')
+
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = self.list_key
+
+ self.um = getUtility(IUserManager)
+
+ self.sender_key = load_key('rsa_1024.priv.asc')
+ self.sender = self.um.create_address('rsa-1024b@example.org')
+
+ def test_pended_data(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True)
+ with suppress(StopIteration):
+ workflow.run_thru('send_key_request')
+ self.assertIsNotNone(workflow.token)
+ pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
+ self.assertEqual(pendable['list_id'], 'test.example.com')
+ self.assertEqual(pendable['email'], 'rsa-1024b@example.org')
+ self.assertEqual(pendable['display_name'], '')
+ self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
+ self.assertEqual(pendable['token_owner'], 'subscriber')
+
+ def test_key_request_sent(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True)
+ list(workflow)
+ items = get_queue_messages('virgin', expected_count=1)
+ message = items[0].msg
+ token = workflow.token
+
+ self.assertEqual(message['Subject'], 'key set {}'.format(token))
+ self.assertEqual(message.get_payload(), KEY_REQUEST)
+
+ def test_key_request_pubkey_set(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=True)
+ workflow.run_thru('pubkey_checks')
+ with patch.object(workflow, '_step_do_subscription') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
+ def test_receive_key(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True)
+ list(workflow)
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ t.add(pgp_address)
+
+ receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
+ receive_workflow.token = workflow.token
+ receive_workflow.restore()
+ receive_workflow.run_thru('receive_key')
+
+ self.assertIsNotNone(receive_workflow.pubkey)
+ self.assertEqual(receive_workflow.pubkey.fingerprint,
+ self.sender_key.pubkey.fingerprint)
+
+ with patch.object(receive_workflow,
+ '_step_send_confirm_request') as step:
+ next(receive_workflow)
+ step.assert_called_once_with()
+
+ def test_receive_key_no_address(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True)
+ list(workflow)
+ receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
+ receive_workflow.token = workflow.token
+ receive_workflow.restore()
+ receive_workflow.run_thru('receive_key')
+
+ self.assertIsNone(receive_workflow.pubkey)
+ with patch.object(receive_workflow,
+ '_step_send_key_request') as step:
+ next(receive_workflow)
+ step.assert_called_once_with()
+
+ def test_receive_key_pubkey_confirmed(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True,
+ pubkey_pre_confirmed=True)
+ list(workflow)
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ t.add(pgp_address)
+
+ receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
+ receive_workflow.token = workflow.token
+ receive_workflow.restore()
+ receive_workflow.run_thru('receive_key')
+ with patch.object(receive_workflow, '_step_do_subscription') as step:
+ next(receive_workflow)
+ step.assert_called_once_with()
+
+ def test_send_key_confirm_request(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=False)
+ list(workflow)
+ items = get_queue_messages('virgin', expected_count=1)
+ message = items[0].msg
+ token = workflow.token
+
+ self.assertEqual(message['Subject'], 'key confirm {}'.format(token))
+ wrapped = PGPWrapper(message)
+ self.assertTrue(wrapped.is_encrypted())
+
+ def test_receive_confirmation(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
+ pre_verified=True,
+ pre_confirmed=True,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=False)
+ list(workflow)
+
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ t.add(pgp_address)
+
+ receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
+ receive_workflow.token = workflow.token
+ receive_workflow.restore()
+ receive_workflow.run_thru('receive_confirmation')
+
+ with patch.object(receive_workflow, '_step_do_subscription') as step:
+ next(receive_workflow)
+ step.assert_called_once_with()
diff --git a/src/mailman_pgp/workflows/tests/test_subscription.py b/src/mailman_pgp/workflows/tests/test_subscription.py
index 8b6b4d1..da59bee 100644
--- a/src/mailman_pgp/workflows/tests/test_subscription.py
+++ b/src/mailman_pgp/workflows/tests/test_subscription.py
@@ -14,3 +14,39 @@
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+import unittest
+
+from mailman.app.lifecycle import create_list
+from mailman.interfaces.usermanager import IUserManager
+from zope.component import getUtility
+
+from mailman_pgp.database import mm_transaction
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.workflows.subscription import (ConfirmSubscriptionPolicy,
+ ModerationSubscriptionPolicy,
+ ConfirmModerationSubscriptionPolicy)
+
+
+class TestSubscriptionWorkflows(unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ with mm_transaction():
+ self.mlist = create_list('test@example.com',
+ style_name='pgp-default')
+ self.sender = getUtility(IUserManager).create_address(
+ 'rsa-1024b@example.org')
+
+ def test_confirm_policy(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender)
+ next(workflow)
+
+ def test_moderation_policy(self):
+ workflow = ModerationSubscriptionPolicy(self.mlist, self.sender)
+ next(workflow)
+
+ def test_confirm_moderation_policy(self):
+ workflow = ConfirmModerationSubscriptionPolicy(self.mlist, self.sender)
+ next(workflow)