aboutsummaryrefslogtreecommitdiff
path: root/src/mailman_pgp/workflows
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman_pgp/workflows')
-rw-r--r--src/mailman_pgp/workflows/key_change.py26
-rw-r--r--src/mailman_pgp/workflows/mod_approval.py18
-rw-r--r--src/mailman_pgp/workflows/tests/test_base.py11
-rw-r--r--src/mailman_pgp/workflows/tests/test_key_change.py31
-rw-r--r--src/mailman_pgp/workflows/tests/test_mod_approval.py81
5 files changed, 138 insertions, 29 deletions
diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py
index 2ef82a8..8536304 100644
--- a/src/mailman_pgp/workflows/key_change.py
+++ b/src/mailman_pgp/workflows/key_change.py
@@ -77,23 +77,27 @@ class KeyChangeBase(Workflow):
def pubkey_key(self, value):
self.pubkey, _ = PGPKey.from_blob(value)
- def _step_change_key(self):
- if self.pgp_address is None or self.pubkey is None:
- raise ValueError
-
- self.push('send_key_confirm_request')
-
- def _step_send_key_confirm_request(self):
+ def _pend(self, token_owner, lifetime=None):
pendings = getUtility(IPendings)
- pendable = KeyChangeWorkflow.pendable_class()(
+ pendable = self.pendable_class()(
email=self.pgp_address.email,
pubkey=str(self.pubkey),
fingerprint=self.pubkey.fingerprint
)
- lifetime = config.get_value('misc', 'change_request_lifetime')
+
self.token = pendings.add(pendable, lifetime=lifetime)
- self.token_owner = TokenOwner.subscriber
+ self.token_owner = token_owner
+ def _step_change_key(self):
+ if self.pgp_address is None or self.pubkey is None:
+ raise ValueError
+
+ self.push('send_key_confirm_request')
+
+ def _step_send_key_confirm_request(self):
+ self._pend(TokenOwner.subscriber,
+ lifetime=config.get_value('misc',
+ 'change_request_lifetime'))
self.push('receive_confirmation')
self.save()
request_address = self.mlist.request_address
@@ -127,7 +131,7 @@ class KeyChangeBase(Workflow):
def pendable_class(cls):
@implementer(IPendable)
class Pendable(dict):
- PEND_TYPE = KeyChangeWorkflow.name
+ PEND_TYPE = cls.name
return Pendable
diff --git a/src/mailman_pgp/workflows/mod_approval.py b/src/mailman_pgp/workflows/mod_approval.py
index e7ff061..90edf4c 100644
--- a/src/mailman_pgp/workflows/mod_approval.py
+++ b/src/mailman_pgp/workflows/mod_approval.py
@@ -16,12 +16,14 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
""""""
+import copy
from mailman.email.message import UserNotification
from mailman.interfaces.subscriptions import TokenOwner
from public import public
from mailman_pgp.pgp.mime import MIMEWrapper
+from mailman_pgp.utils.email import overwrite_message
MOD_APPROVAL_REQUEST = """\
----------
@@ -37,26 +39,24 @@ Fingerprint: {}
@public
class ModeratorApprovalMixin:
def _step_mod_approval(self):
- self.push('restore')
self.push('get_approval')
def _step_get_approval(self):
- self._set_token(TokenOwner.moderator)
- self.push('restore')
+ self._pend(TokenOwner.moderator)
+ self.push('receive_confirmation')
self.save()
if self.mlist.admin_immed_notify:
- subject = 'New key change request to {} from {}'.format(
- self.mlist.display_name, self.pgp_address.email)
+ subject = 'New key change request from {}'.format(
+ self.pgp_address.email)
text = MOD_APPROVAL_REQUEST.format(self.pgp_address.email,
self.pubkey.fingerprint)
msg = UserNotification(
self.mlist.owner_address, self.mlist.owner_address,
subject, text, self.mlist.preferred_language)
+ out = copy.deepcopy(msg)
wrapped = MIMEWrapper(msg)
msg = wrapped.attach_keys(self.pubkey)
- msg.send(self.mlist)
+ overwrite_message(msg, out)
+ out.send(self.mlist)
raise StopIteration
-
- def _step_restore(self):
- self._set_token(TokenOwner.no_one)
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
index 31b3d05..5273939 100644
--- a/src/mailman_pgp/workflows/tests/test_base.py
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -49,13 +49,14 @@ class PubkeyMixinTestSetup():
self.list_key = load_key('ecc_p256.priv.asc')
- self.pgp_list = PGPMailingList.for_list(self.mlist)
- self.pgp_list.key = self.list_key
+ with transaction():
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = self.list_key
self.um = getUtility(IUserManager)
self.sender_key = load_key('rsa_1024.priv.asc')
- self.sender = self.um.create_address('rsa-1024b@example.org')
+ self.sender = self.um.create_address('anne@example.org')
@implementer(IWorkflow)
@@ -203,7 +204,7 @@ class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase):
self.assertIsNotNone(workflow.token)
pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
self.assertEqual(pendable['list_id'], 'test.example.com')
- self.assertEqual(pendable['email'], 'rsa-1024b@example.org')
+ self.assertEqual(pendable['email'], 'anne@example.org')
self.assertEqual(pendable['display_name'], '')
self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
self.assertEqual(pendable['token_owner'], 'subscriber')
@@ -216,7 +217,7 @@ class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase):
self.assertIsNotNone(workflow.token)
pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
self.assertEqual(pendable['list_id'], 'test.example.com')
- self.assertEqual(pendable['email'], 'rsa-1024b@example.org')
+ self.assertEqual(pendable['email'], 'anne@example.org')
self.assertEqual(pendable['display_name'], '')
self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
self.assertEqual(pendable['token_owner'], 'subscriber')
diff --git a/src/mailman_pgp/workflows/tests/test_key_change.py b/src/mailman_pgp/workflows/tests/test_key_change.py
index e469d51..5d4926a 100644
--- a/src/mailman_pgp/workflows/tests/test_key_change.py
+++ b/src/mailman_pgp/workflows/tests/test_key_change.py
@@ -25,13 +25,15 @@ from mailman.interfaces.usermanager import IUserManager
from mailman.testing.helpers import get_queue_messages
from zope.component import getUtility
+from mailman_pgp.config import mm_config
from mailman_pgp.database import mm_transaction, transaction
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.testing.layers import PGPConfigLayer
from mailman_pgp.testing.pgp import load_key
-from mailman_pgp.workflows.key_change import KeyChangeWorkflow
+from mailman_pgp.workflows.key_change import (KeyChangeModWorkflow,
+ KeyChangeWorkflow)
class TestKeyChangeWorkflow(unittest.TestCase):
@@ -41,13 +43,18 @@ class TestKeyChangeWorkflow(unittest.TestCase):
with mm_transaction():
self.mlist = create_list('test@example.com',
style_name='pgp-default')
- self.pgp_list = PGPMailingList.for_list(self.mlist)
- self.pgp_list.key = load_key('ecc_p256.priv.asc')
+ with transaction():
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = load_key('ecc_p256.priv.asc')
self.sender_key = load_key('rsa_1024.priv.asc')
self.sender_new_key = load_key('ecc_p256.priv.asc')
self.sender = getUtility(IUserManager).create_address(
- 'rsa-1024b@example.org')
+ 'anne@example.org')
+
+ def test_has_workflows(self):
+ self.assertTrue(KeyChangeWorkflow.name, mm_config.workflows)
+ self.assertTrue(KeyChangeModWorkflow.name, mm_config.workflows)
def test_pgp_address_none(self):
workflow = KeyChangeWorkflow(self.mlist)
@@ -101,3 +108,19 @@ class TestKeyChangeWorkflow(unittest.TestCase):
self.assertEqual(pgp_address.key_fingerprint,
self.sender_new_key.fingerprint)
self.assertTrue(pgp_address.key_confirmed)
+
+ def test_confirm_mod(self):
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ workflow = KeyChangeModWorkflow(self.mlist, pgp_address,
+ self.sender_new_key.pubkey)
+ list(workflow)
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).confirm(
+ workflow.token)
+ self.assertIsNotNone(token)
+ self.assertEqual(token_owner, TokenOwner.moderator)
diff --git a/src/mailman_pgp/workflows/tests/test_mod_approval.py b/src/mailman_pgp/workflows/tests/test_mod_approval.py
index 8b6b4d1..49e4204 100644
--- a/src/mailman_pgp/workflows/tests/test_mod_approval.py
+++ b/src/mailman_pgp/workflows/tests/test_mod_approval.py
@@ -14,3 +14,84 @@
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+from unittest import TestCase
+
+from mailman.app.lifecycle import create_list
+from mailman.interfaces.subscriptions import TokenOwner
+from mailman.interfaces.usermanager import IUserManager
+from mailman.interfaces.workflows import IWorkflow
+from mailman.testing.helpers import get_queue_messages
+from zope.component import getUtility
+from zope.interface import implementer
+
+from mailman_pgp.database import mm_transaction, transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.wrapper import PGPWrapper
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.testing.pgp import load_key
+from mailman_pgp.workflows.key_change import KeyChangeBase
+from mailman_pgp.workflows.mod_approval import ModeratorApprovalMixin
+
+
+@implementer(IWorkflow)
+class PGPTestWorkflow(KeyChangeBase, ModeratorApprovalMixin):
+ name = 'test-workflow'
+ description = ''
+ initial_state = 'mod_approval'
+
+
+class TestModeratorApprovalMixin(TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ with mm_transaction():
+ self.mlist = create_list('test@example.com',
+ style_name='pgp-default')
+ with transaction():
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = load_key('ecc_p256.priv.asc')
+
+ self.sender_key = load_key('rsa_1024.priv.asc')
+ self.sender_new_key = load_key('ecc_p256.priv.asc')
+ self.sender = getUtility(IUserManager).create_address(
+ 'anne@example.org')
+
+ def test_get_approval(self):
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ workflow = PGPTestWorkflow(self.mlist, pgp_address,
+ self.sender_new_key.pubkey)
+ list(workflow)
+ items = get_queue_messages('virgin', expected_count=1)
+ message = items[0].msg
+
+ self.assertEqual(message['Subject'],
+ 'New key change request from {}'.format(
+ pgp_address.email))
+ wrapped = PGPWrapper(message)
+ self.assertTrue(wrapped.has_keys())
+ keys = list(wrapped.keys())
+ self.assertEqual(len(keys), 1)
+ key = keys.pop()
+ self.assertEqual(key.fingerprint, self.sender_new_key.fingerprint)
+
+ def test_receive_approval(self):
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ workflow = PGPTestWorkflow(self.mlist, pgp_address,
+ self.sender_new_key.pubkey)
+ list(workflow)
+ get_queue_messages('virgin', expected_count=1)
+ list(workflow)
+ self.assertEqual(workflow.token_owner, TokenOwner.no_one)