aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJ08nY2017-07-14 01:44:54 +0200
committerJ08nY2017-07-14 01:44:54 +0200
commitd8afe4bec9282254483ea1c7571298dcd9731508 (patch)
treea847fca023b6601771ad0aeb35b0cbba9ee4480e
parent8368cd832d21b404c01ab475ade6209b906ab422 (diff)
downloadmailman-pgp-d8afe4bec9282254483ea1c7571298dcd9731508.tar.gz
mailman-pgp-d8afe4bec9282254483ea1c7571298dcd9731508.tar.zst
mailman-pgp-d8afe4bec9282254483ea1c7571298dcd9731508.zip
-rw-r--r--src/mailman_pgp/commands/tests/test_key.py65
-rw-r--r--src/mailman_pgp/workflows/base.py3
-rw-r--r--src/mailman_pgp/workflows/tests/test_base.py130
-rw-r--r--src/mailman_pgp/workflows/tests/test_subscription.py2
4 files changed, 105 insertions, 95 deletions
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
index 4f62b11..44d5b25 100644
--- a/src/mailman_pgp/commands/tests/test_key.py
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -26,6 +26,7 @@ from mailman.interfaces.usermanager import IUserManager
from mailman.runners.command import CommandRunner
from mailman.testing.helpers import get_queue_messages, make_testable_runner
from mailman.utilities.datetime import now
+from public import public
from zope.component import getUtility
from mailman_pgp.config import mm_config
@@ -59,6 +60,7 @@ def _create_mixed(from_hdr, to_hdr, subject_hdr):
return message
+@public
class TestPreDispatch(unittest.TestCase):
layer = PGPConfigLayer
@@ -103,6 +105,7 @@ class TestPreDispatch(unittest.TestCase):
results_msg.get_payload())
+@public
class TestPreSubscription(unittest.TestCase):
layer = PGPConfigLayer
@@ -111,6 +114,9 @@ class TestPreSubscription(unittest.TestCase):
self.pgp_list = PGPMailingList.for_list(self.mlist)
self.pgp_list.key = load_key('ecc_p256.priv.asc')
+ self.bart_key = load_key('rsa_1024.priv.asc')
+ self.anne_key = load_key('ecc_p256.priv.asc')
+
def test_set(self):
self.mlist.subscription_policy = OpenSubscriptionPolicy
bart = getUtility(IUserManager).create_address('bart@example.com',
@@ -121,12 +127,10 @@ class TestPreSubscription(unittest.TestCase):
get_queue_messages('virgin')
- bart_key = load_key('rsa_1024.priv.asc')
-
set_message = _create_mixed('bart@example.com', 'test@example.com',
'Re: key set {}'.format(token))
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(bart_key.pubkey)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
mm_config.switchboards['command'].enqueue(set_message,
listid='test.example.com')
@@ -134,7 +138,8 @@ class TestPreSubscription(unittest.TestCase):
pgp_address = PGPAddress.for_address(bart)
self.assertIsNotNone(pgp_address)
- self.assertEqual(pgp_address.key.fingerprint, bart_key.fingerprint)
+ self.assertEqual(pgp_address.key.fingerprint,
+ self.bart_key.fingerprint)
self.assertFalse(pgp_address.key_confirmed)
items = get_queue_messages('virgin', expected_count=2)
@@ -146,7 +151,7 @@ class TestPreSubscription(unittest.TestCase):
confirm_request = items[0].msg
self.assertIn('Key succesfully set.', results.get_payload())
- self.assertIn('Key fingerprint: {}'.format(bart_key.fingerprint),
+ self.assertIn('Key fingerprint: {}'.format(self.bart_key.fingerprint),
results.get_payload())
confirm_wrapped = PGPWrapper(confirm_request)
@@ -178,15 +183,12 @@ class TestPreSubscription(unittest.TestCase):
results_msg.get_payload())
def test_set_multiple_keys(self):
- bart_key = load_key('rsa_1024.priv.asc')
- anne_key = load_key('ecc_p256.priv.asc')
-
set_message = _create_mixed('bart@example.com', 'test@example.com',
'Re: key set token')
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(bart_key.pubkey)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(anne_key.pubkey)
+ set_message = wrapped_set_message.attach_key(self.anne_key.pubkey)
mm_config.switchboards['command'].enqueue(set_message,
listid='test.example.com')
@@ -198,11 +200,9 @@ class TestPreSubscription(unittest.TestCase):
results_msg.get_payload())
def test_set_no_email(self):
- bart_key = load_key('rsa_1024.priv.asc')
-
message = _create_mixed('', 'test@example.com', 'key set token')
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_key(bart_key.pubkey)
+ message = wrapped_message.attach_key(self.bart_key.pubkey)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -213,12 +213,10 @@ class TestPreSubscription(unittest.TestCase):
self.assertIn('No email to subscribe with.', results_msg.get_payload())
def test_set_no_address(self):
- bart_key = load_key('rsa_1024.priv.asc')
-
set_message = _create_mixed('bart@example.com', 'test@example.com',
'key set token')
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(bart_key.pubkey)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
mm_config.switchboards['command'].enqueue(set_message,
listid='test.example.com')
@@ -235,19 +233,18 @@ class TestPreSubscription(unittest.TestCase):
'Bart Person')
bart.verified_on = now()
- bart_key = load_key('rsa_1024.priv.asc')
-
token, token_owner, member = ISubscriptionManager(self.mlist).register(
- bart, pubkey=bart_key.pubkey)
+ bart, pubkey=self.bart_key.pubkey)
get_queue_messages('virgin')
message = _create_plain('bart@example.com', 'test@example.com',
'Re: key confirm {}'.format(token),
- CONFIRM_REQUEST.format(bart_key.fingerprint,
- token))
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ token))
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.sign(bart_key)
+ message = wrapped_message.sign(self.bart_key)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -263,20 +260,20 @@ class TestPreSubscription(unittest.TestCase):
'Bart Person')
bart.verified_on = now()
- bart_key = load_key('rsa_1024.priv.asc')
-
token, token_owner, member = ISubscriptionManager(self.mlist).register(
- bart, pubkey=bart_key.pubkey)
+ bart, pubkey=self.bart_key.pubkey)
get_queue_messages('virgin')
message = _create_plain('bart@example.com', 'test@example.com',
'Re: key confirm {}'.format(token),
- CONFIRM_REQUEST.format(bart_key.fingerprint,
- token))
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ token))
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.sign_encrypt(bart_key, self.pgp_list.pubkey,
- bart_key.pubkey)
+ message = wrapped_message.sign_encrypt(self.bart_key,
+ self.pgp_list.pubkey,
+ self.bart_key.pubkey)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -330,17 +327,16 @@ class TestPreSubscription(unittest.TestCase):
'Bart Person')
bart.verified_on = now()
- bart_key = load_key('rsa_1024.priv.asc')
-
token, token_owner, member = ISubscriptionManager(self.mlist).register(
- bart, pubkey=bart_key.pubkey)
+ bart, pubkey=self.bart_key.pubkey)
get_queue_messages('virgin')
message = _create_plain('bart@example.com', 'test@example.com',
'Re: key confirm {}'.format(token),
- CONFIRM_REQUEST.format(bart_key.fingerprint,
- token))
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ token))
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -353,6 +349,7 @@ class TestPreSubscription(unittest.TestCase):
results_msg.get_payload())
+@public
class TestAfterSubscription(unittest.TestCase):
layer = PGPConfigLayer
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index d05781d..014f2dd 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -55,9 +55,8 @@ class PGPMixin:
class SetPubkeyMixin:
- def __init__(self, pubkey=None, pre_confirmed=False):
+ def __init__(self, pubkey=None):
self.pubkey = pubkey
- self.pubkey_confirmed = pre_confirmed
@property
def pubkey_key(self):
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
index af00d01..c59cfe0 100644
--- a/src/mailman_pgp/workflows/tests/test_base.py
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -24,8 +24,12 @@ from unittest.mock import patch
from mailman.app.lifecycle import create_list
from mailman.interfaces.pending import IPendings
from mailman.interfaces.usermanager import IUserManager
+from mailman.interfaces.workflows import IWorkflow
from mailman.testing.helpers import get_queue_messages
+from mailman.workflows.common import SubscriptionBase
+from public import public
from zope.component import getUtility
+from zope.interface import implementer
from mailman_pgp.database import mm_transaction, transaction
from mailman_pgp.model.address import PGPAddress
@@ -33,11 +37,11 @@ from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.tests.base import load_key
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.testing.layers import PGPConfigLayer
-from mailman_pgp.workflows.base import KEY_REQUEST
-from mailman_pgp.workflows.subscription import ConfirmSubscriptionPolicy
+from mailman_pgp.workflows.base import (KEY_REQUEST, PGPMixin, SetPubkeyMixin,
+ ConfirmPubkeyMixin)
-class PubkeyMixinSetup():
+class PubkeyMixinTestSetup():
def setUp(self):
with mm_transaction():
self.mlist = create_list('test@example.com',
@@ -54,34 +58,62 @@ class PubkeyMixinSetup():
self.sender = self.um.create_address('rsa-1024b@example.org')
-class TestPGPMixin(PubkeyMixinSetup, unittest.TestCase):
+@implementer(IWorkflow)
+class PGPTestWorkflow(SubscriptionBase, PGPMixin, SetPubkeyMixin,
+ ConfirmPubkeyMixin):
+ name = 'test-workflow'
+ description = ''
+ initial_state = 'prepare'
+ save_attributes = (
+ 'pubkey_key',
+ 'pubkey_confirmed',
+ 'address_key',
+ 'subscriber_key',
+ 'user_key',
+ 'token_owner_key'
+ )
+
+ def __init__(self, mlist, subscriber=None, *, pubkey=None,
+ pubkey_pre_confirmed=False):
+ SubscriptionBase.__init__(self, mlist, subscriber)
+ SetPubkeyMixin.__init__(self, pubkey=pubkey)
+ ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
+ PGPMixin.__init__(self)
+
+ def _step_prepare(self):
+ self.push('do_subscription')
+ self.push('pubkey_confirmation')
+ self.push('pubkey_checks')
+ self.push('pgp_prepare')
+ self.push('sanity_checks')
+
+
+@public
+class TestPGPMixin(PubkeyMixinTestSetup, unittest.TestCase):
layer = PGPConfigLayer
def test_create_address(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
workflow.run_thru('pgp_prepare')
pgp_address = PGPAddress.for_address(self.sender)
self.assertIsNotNone(pgp_address)
def test_address_existing(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
t.add(pgp_address)
workflow.run_thru('pgp_prepare')
+ still = PGPAddress.for_address(self.sender)
+ self.assertIsNotNone(still)
-class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
+@public
+class TestSetPubkeyMixin(PubkeyMixinTestSetup, unittest.TestCase):
layer = PGPConfigLayer
def test_key_request_sent(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
list(workflow)
items = get_queue_messages('virgin', expected_count=1)
message = items[0].msg
@@ -91,24 +123,20 @@ class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
self.assertEqual(message.get_payload(), KEY_REQUEST)
def test_receive_key(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
list(workflow)
with transaction():
pgp_address = PGPAddress.for_address(self.sender)
pgp_address.key = self.sender_key.pubkey
- receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
+ receive_workflow = PGPTestWorkflow(self.mlist)
receive_workflow.token = workflow.token
receive_workflow.restore()
receive_workflow.run_thru('receive_key')
def test_set_pubkey(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True,
- pubkey=self.sender_key.pubkey)
+ workflow = PGPTestWorkflow(self.mlist, self.sender,
+ pubkey=self.sender_key.pubkey)
workflow.run_thru('pubkey_checks')
pgp_address = PGPAddress.for_address(self.sender)
self.assertIsNotNone(pgp_address)
@@ -117,9 +145,7 @@ class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
self.sender_key.fingerprint)
def test_pubkey_set(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
pgp_address.key = self.sender_key.pubkey
@@ -129,26 +155,23 @@ class TestSetPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
self.sender_key.fingerprint)
-class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
+@public
+class TestConfirmPubkeyMixin(PubkeyMixinTestSetup, unittest.TestCase):
layer = PGPConfigLayer
def test_key_request_pubkey_set(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True,
- pubkey=self.sender_key.pubkey,
- pubkey_pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=True)
workflow.run_thru('pubkey_confirmation')
with patch.object(workflow, '_step_do_subscription') as step:
next(workflow)
step.assert_called_once_with()
def test_send_key_confirm_request(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True,
- pubkey=self.sender_key.pubkey,
- pubkey_pre_confirmed=False)
+ workflow = PGPTestWorkflow(self.mlist, self.sender,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=False)
list(workflow)
items = get_queue_messages('virgin', expected_count=1)
message = items[0].msg
@@ -159,14 +182,12 @@ class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
self.assertTrue(wrapped.is_encrypted())
def test_receive_confirmation(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True,
- pubkey=self.sender_key.pubkey,
- pubkey_pre_confirmed=False)
+ workflow = PGPTestWorkflow(self.mlist, self.sender,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=False)
list(workflow)
- receive_workflow = ConfirmSubscriptionPolicy(self.mlist)
+ receive_workflow = PGPTestWorkflow(self.mlist)
receive_workflow.token = workflow.token
receive_workflow.restore()
receive_workflow.run_thru('receive_key_confirmation')
@@ -175,13 +196,12 @@ class TestConfirmPubkeyMixin(PubkeyMixinSetup, unittest.TestCase):
step.assert_called_once_with()
-class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase):
+@public
+class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase):
layer = PGPConfigLayer
def test_pended_data_key_request(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
with suppress(StopIteration):
workflow.run_thru('send_key_request')
self.assertIsNotNone(workflow.token)
@@ -193,10 +213,8 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase):
self.assertEqual(pendable['token_owner'], 'subscriber')
def test_pended_data_key_confirmation(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True,
- pubkey=self.sender_key.pubkey)
+ workflow = PGPTestWorkflow(self.mlist, self.sender,
+ pubkey=self.sender_key.pubkey)
with suppress(StopIteration):
workflow.run_thru('send_key_confirm_request')
self.assertIsNotNone(workflow.token)
@@ -208,9 +226,7 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase):
self.assertEqual(pendable['token_owner'], 'subscriber')
def test_exisitng_pgp_address(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
@@ -224,9 +240,7 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase):
step.assert_called_once_with()
def test_exisitng_pgp_address_not_confirmed(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
@@ -239,9 +253,7 @@ class TestBothPubkeyMixins(PubkeyMixinSetup, unittest.TestCase):
step.assert_called_once_with()
def test_exisitng_pgp_address_no_key(self):
- workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender,
- pre_verified=True,
- pre_confirmed=True)
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
with transaction() as t:
pgp_address = PGPAddress(self.sender)
diff --git a/src/mailman_pgp/workflows/tests/test_subscription.py b/src/mailman_pgp/workflows/tests/test_subscription.py
index 9464a83..f9fa1e1 100644
--- a/src/mailman_pgp/workflows/tests/test_subscription.py
+++ b/src/mailman_pgp/workflows/tests/test_subscription.py
@@ -20,6 +20,7 @@ import unittest
from mailman.app.lifecycle import create_list
from mailman.interfaces.usermanager import IUserManager
+from public import public
from zope.component import getUtility
from mailman_pgp.database import mm_transaction
@@ -29,6 +30,7 @@ from mailman_pgp.workflows.subscription import (
ModerationSubscriptionPolicy, OpenSubscriptionPolicy)
+@public
class TestSubscriptionWorkflows(unittest.TestCase):
layer = PGPConfigLayer