diff options
Diffstat (limited to 'src/mailman_pgp')
| -rw-r--r-- | src/mailman_pgp/commands/tests/test_key.py | 2 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/key_change.py | 26 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/mod_approval.py | 18 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/tests/test_base.py | 11 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/tests/test_key_change.py | 31 | ||||
| -rw-r--r-- | src/mailman_pgp/workflows/tests/test_mod_approval.py | 81 |
6 files changed, 139 insertions, 30 deletions
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index 674438d..ec535ec 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -33,7 +33,7 @@ from pgpy.constants import ( from zope.component import getUtility from mailman_pgp.config import mm_config -from mailman_pgp.database import transaction, mm_transaction +from mailman_pgp.database import mm_transaction, transaction from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.mime import MIMEWrapper diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py index 2ef82a8..8536304 100644 --- a/src/mailman_pgp/workflows/key_change.py +++ b/src/mailman_pgp/workflows/key_change.py @@ -77,23 +77,27 @@ class KeyChangeBase(Workflow): def pubkey_key(self, value): self.pubkey, _ = PGPKey.from_blob(value) - def _step_change_key(self): - if self.pgp_address is None or self.pubkey is None: - raise ValueError - - self.push('send_key_confirm_request') - - def _step_send_key_confirm_request(self): + def _pend(self, token_owner, lifetime=None): pendings = getUtility(IPendings) - pendable = KeyChangeWorkflow.pendable_class()( + pendable = self.pendable_class()( email=self.pgp_address.email, pubkey=str(self.pubkey), fingerprint=self.pubkey.fingerprint ) - lifetime = config.get_value('misc', 'change_request_lifetime') + self.token = pendings.add(pendable, lifetime=lifetime) - self.token_owner = TokenOwner.subscriber + self.token_owner = token_owner + def _step_change_key(self): + if self.pgp_address is None or self.pubkey is None: + raise ValueError + + self.push('send_key_confirm_request') + + def _step_send_key_confirm_request(self): + self._pend(TokenOwner.subscriber, + lifetime=config.get_value('misc', + 'change_request_lifetime')) self.push('receive_confirmation') self.save() request_address = self.mlist.request_address @@ -127,7 +131,7 @@ class KeyChangeBase(Workflow): def pendable_class(cls): @implementer(IPendable) class Pendable(dict): - PEND_TYPE = KeyChangeWorkflow.name + PEND_TYPE = cls.name return Pendable diff --git a/src/mailman_pgp/workflows/mod_approval.py b/src/mailman_pgp/workflows/mod_approval.py index e7ff061..90edf4c 100644 --- a/src/mailman_pgp/workflows/mod_approval.py +++ b/src/mailman_pgp/workflows/mod_approval.py @@ -16,12 +16,14 @@ # this program. If not, see <http://www.gnu.org/licenses/>. """""" +import copy from mailman.email.message import UserNotification from mailman.interfaces.subscriptions import TokenOwner from public import public from mailman_pgp.pgp.mime import MIMEWrapper +from mailman_pgp.utils.email import overwrite_message MOD_APPROVAL_REQUEST = """\ ---------- @@ -37,26 +39,24 @@ Fingerprint: {} @public class ModeratorApprovalMixin: def _step_mod_approval(self): - self.push('restore') self.push('get_approval') def _step_get_approval(self): - self._set_token(TokenOwner.moderator) - self.push('restore') + self._pend(TokenOwner.moderator) + self.push('receive_confirmation') self.save() if self.mlist.admin_immed_notify: - subject = 'New key change request to {} from {}'.format( - self.mlist.display_name, self.pgp_address.email) + subject = 'New key change request from {}'.format( + self.pgp_address.email) text = MOD_APPROVAL_REQUEST.format(self.pgp_address.email, self.pubkey.fingerprint) msg = UserNotification( self.mlist.owner_address, self.mlist.owner_address, subject, text, self.mlist.preferred_language) + out = copy.deepcopy(msg) wrapped = MIMEWrapper(msg) msg = wrapped.attach_keys(self.pubkey) - msg.send(self.mlist) + overwrite_message(msg, out) + out.send(self.mlist) raise StopIteration - - def _step_restore(self): - self._set_token(TokenOwner.no_one) diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py index 31b3d05..5273939 100644 --- a/src/mailman_pgp/workflows/tests/test_base.py +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -49,13 +49,14 @@ class PubkeyMixinTestSetup(): self.list_key = load_key('ecc_p256.priv.asc') - self.pgp_list = PGPMailingList.for_list(self.mlist) - self.pgp_list.key = self.list_key + with transaction(): + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = self.list_key self.um = getUtility(IUserManager) self.sender_key = load_key('rsa_1024.priv.asc') - self.sender = self.um.create_address('rsa-1024b@example.org') + self.sender = self.um.create_address('anne@example.org') @implementer(IWorkflow) @@ -203,7 +204,7 @@ class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase): self.assertIsNotNone(workflow.token) pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) self.assertEqual(pendable['list_id'], 'test.example.com') - self.assertEqual(pendable['email'], 'rsa-1024b@example.org') + self.assertEqual(pendable['email'], 'anne@example.org') self.assertEqual(pendable['display_name'], '') self.assertEqual(pendable['when'], '2005-08-01T07:49:23') self.assertEqual(pendable['token_owner'], 'subscriber') @@ -216,7 +217,7 @@ class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase): self.assertIsNotNone(workflow.token) pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) self.assertEqual(pendable['list_id'], 'test.example.com') - self.assertEqual(pendable['email'], 'rsa-1024b@example.org') + self.assertEqual(pendable['email'], 'anne@example.org') self.assertEqual(pendable['display_name'], '') self.assertEqual(pendable['when'], '2005-08-01T07:49:23') self.assertEqual(pendable['token_owner'], 'subscriber') diff --git a/src/mailman_pgp/workflows/tests/test_key_change.py b/src/mailman_pgp/workflows/tests/test_key_change.py index e469d51..5d4926a 100644 --- a/src/mailman_pgp/workflows/tests/test_key_change.py +++ b/src/mailman_pgp/workflows/tests/test_key_change.py @@ -25,13 +25,15 @@ from mailman.interfaces.usermanager import IUserManager from mailman.testing.helpers import get_queue_messages from zope.component import getUtility +from mailman_pgp.config import mm_config from mailman_pgp.database import mm_transaction, transaction from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.testing.layers import PGPConfigLayer from mailman_pgp.testing.pgp import load_key -from mailman_pgp.workflows.key_change import KeyChangeWorkflow +from mailman_pgp.workflows.key_change import (KeyChangeModWorkflow, + KeyChangeWorkflow) class TestKeyChangeWorkflow(unittest.TestCase): @@ -41,13 +43,18 @@ class TestKeyChangeWorkflow(unittest.TestCase): with mm_transaction(): self.mlist = create_list('test@example.com', style_name='pgp-default') - self.pgp_list = PGPMailingList.for_list(self.mlist) - self.pgp_list.key = load_key('ecc_p256.priv.asc') + with transaction(): + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = load_key('ecc_p256.priv.asc') self.sender_key = load_key('rsa_1024.priv.asc') self.sender_new_key = load_key('ecc_p256.priv.asc') self.sender = getUtility(IUserManager).create_address( - 'rsa-1024b@example.org') + 'anne@example.org') + + def test_has_workflows(self): + self.assertTrue(KeyChangeWorkflow.name, mm_config.workflows) + self.assertTrue(KeyChangeModWorkflow.name, mm_config.workflows) def test_pgp_address_none(self): workflow = KeyChangeWorkflow(self.mlist) @@ -101,3 +108,19 @@ class TestKeyChangeWorkflow(unittest.TestCase): self.assertEqual(pgp_address.key_fingerprint, self.sender_new_key.fingerprint) self.assertTrue(pgp_address.key_confirmed) + + def test_confirm_mod(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow = KeyChangeModWorkflow(self.mlist, pgp_address, + self.sender_new_key.pubkey) + list(workflow) + + token, token_owner, member = ISubscriptionManager(self.mlist).confirm( + workflow.token) + self.assertIsNotNone(token) + self.assertEqual(token_owner, TokenOwner.moderator) diff --git a/src/mailman_pgp/workflows/tests/test_mod_approval.py b/src/mailman_pgp/workflows/tests/test_mod_approval.py index 8b6b4d1..49e4204 100644 --- a/src/mailman_pgp/workflows/tests/test_mod_approval.py +++ b/src/mailman_pgp/workflows/tests/test_mod_approval.py @@ -14,3 +14,84 @@ # # You should have received a copy of the GNU General Public License along with # this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +from unittest import TestCase + +from mailman.app.lifecycle import create_list +from mailman.interfaces.subscriptions import TokenOwner +from mailman.interfaces.usermanager import IUserManager +from mailman.interfaces.workflows import IWorkflow +from mailman.testing.helpers import get_queue_messages +from zope.component import getUtility +from zope.interface import implementer + +from mailman_pgp.database import mm_transaction, transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.testing.pgp import load_key +from mailman_pgp.workflows.key_change import KeyChangeBase +from mailman_pgp.workflows.mod_approval import ModeratorApprovalMixin + + +@implementer(IWorkflow) +class PGPTestWorkflow(KeyChangeBase, ModeratorApprovalMixin): + name = 'test-workflow' + description = '' + initial_state = 'mod_approval' + + +class TestModeratorApprovalMixin(TestCase): + layer = PGPConfigLayer + + def setUp(self): + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + with transaction(): + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = load_key('ecc_p256.priv.asc') + + self.sender_key = load_key('rsa_1024.priv.asc') + self.sender_new_key = load_key('ecc_p256.priv.asc') + self.sender = getUtility(IUserManager).create_address( + 'anne@example.org') + + def test_get_approval(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow = PGPTestWorkflow(self.mlist, pgp_address, + self.sender_new_key.pubkey) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + + self.assertEqual(message['Subject'], + 'New key change request from {}'.format( + pgp_address.email)) + wrapped = PGPWrapper(message) + self.assertTrue(wrapped.has_keys()) + keys = list(wrapped.keys()) + self.assertEqual(len(keys), 1) + key = keys.pop() + self.assertEqual(key.fingerprint, self.sender_new_key.fingerprint) + + def test_receive_approval(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow = PGPTestWorkflow(self.mlist, pgp_address, + self.sender_new_key.pubkey) + list(workflow) + get_queue_messages('virgin', expected_count=1) + list(workflow) + self.assertEqual(workflow.token_owner, TokenOwner.no_one) |
