aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJ08nY2017-08-07 01:30:15 +0200
committerJ08nY2017-08-07 01:30:15 +0200
commitbe8e21927d063ee5ddd5fc7376669164f9914ad0 (patch)
tree590b3c7a582507869670635270ecdac876280176
parent21b504db4f63efc5d2fa58c646c82d5d8659eca1 (diff)
parent59ec076d04340245101de98633705d312374d9fe (diff)
downloadmailman-pgp-be8e21927d063ee5ddd5fc7376669164f9914ad0.tar.gz
mailman-pgp-be8e21927d063ee5ddd5fc7376669164f9914ad0.tar.zst
mailman-pgp-be8e21927d063ee5ddd5fc7376669164f9914ad0.zip
-rw-r--r--src/mailman_pgp/commands/eml_key.py128
-rw-r--r--src/mailman_pgp/commands/tests/test_key.py231
-rw-r--r--src/mailman_pgp/model/list.py4
-rw-r--r--src/mailman_pgp/pgp/inline.py53
-rw-r--r--src/mailman_pgp/pgp/mime.py89
-rw-r--r--src/mailman_pgp/pgp/tests/data/messages/inline_revoc.eml19
-rw-r--r--src/mailman_pgp/pgp/tests/data/messages/inline_revoc_multipart.eml30
-rw-r--r--src/mailman_pgp/pgp/tests/data/messages/mime_revoc.eml35
-rw-r--r--src/mailman_pgp/pgp/tests/data/revocs/dsa_elgamal_1024.revoc.asc8
-rw-r--r--src/mailman_pgp/pgp/tests/data/revocs/ecc_curve25519.revoc.asc8
-rw-r--r--src/mailman_pgp/pgp/tests/data/revocs/ecc_p256.revoc.asc8
-rw-r--r--src/mailman_pgp/pgp/tests/data/revocs/ecc_secp256k1.revoc.asc8
-rw-r--r--src/mailman_pgp/pgp/tests/data/revocs/rsa_1024.revoc.asc9
-rw-r--r--src/mailman_pgp/pgp/tests/test_inline.py43
-rw-r--r--src/mailman_pgp/pgp/tests/test_mime.py48
-rw-r--r--src/mailman_pgp/pgp/tests/test_wrapper.py38
-rw-r--r--src/mailman_pgp/pgp/wrapper.py31
-rw-r--r--src/mailman_pgp/testing/pgp.py71
-rw-r--r--src/mailman_pgp/utils/email.py21
-rw-r--r--src/mailman_pgp/utils/pgp.py75
-rw-r--r--src/mailman_pgp/utils/tests/__init__.py0
-rw-r--r--src/mailman_pgp/utils/tests/test_pgp.py86
-rw-r--r--src/mailman_pgp/workflows/base.py70
-rw-r--r--src/mailman_pgp/workflows/key_change.py99
-rw-r--r--src/mailman_pgp/workflows/key_confirm.py77
-rw-r--r--src/mailman_pgp/workflows/key_revoke.py70
-rw-r--r--src/mailman_pgp/workflows/key_set.py76
-rw-r--r--src/mailman_pgp/workflows/mod_approval.py153
-rw-r--r--src/mailman_pgp/workflows/pubkey.py118
-rw-r--r--src/mailman_pgp/workflows/subscription.py68
-rw-r--r--src/mailman_pgp/workflows/tests/test_base.py23
-rw-r--r--src/mailman_pgp/workflows/tests/test_key_change.py31
-rw-r--r--src/mailman_pgp/workflows/tests/test_mod_approval.py106
33 files changed, 1647 insertions, 287 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index 0775bed..5799b8f 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -17,6 +17,7 @@
"""The key email command."""
import logging
+import copy
from email.mime.text import MIMEText
from mailman.email.message import UserNotification
@@ -25,19 +26,24 @@ from mailman.interfaces.pending import IPendings
from mailman.interfaces.subscriptions import ISubscriptionManager
from mailman.interfaces.usermanager import IUserManager
from pgpy.constants import KeyFlags
+from pgpy.errors import PGPError
from public import public
from zope.component import getUtility
from zope.interface import implementer
+from mailman_pgp.config import mm_config
from mailman_pgp.database import transaction
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.mime import MIMEWrapper
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.utils.email import get_email
+from mailman_pgp.utils.pgp import key_usable
from mailman_pgp.workflows.key_change import (CHANGE_CONFIRM_REQUEST,
+ KeyChangeModWorkflow,
KeyChangeWorkflow)
-from mailman_pgp.workflows.pubkey import CONFIRM_REQUEST
+from mailman_pgp.workflows.key_confirm import CONFIRM_REQUEST
+from mailman_pgp.workflows.key_revoke import KeyRevokeWorkflow
log = logging.getLogger('mailman.plugin.pgp.commands')
@@ -78,10 +84,7 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results):
file=results)
return ContinueProcessing.no
- usage_flags = key.usage_flags()
- for subkey in key.subkeys.values():
- usage_flags |= subkey.usage_flags()
- if KeyFlags.EncryptCommunications not in usage_flags:
+ if not key_usable(key, {KeyFlags.EncryptCommunications, KeyFlags.Sign}):
print('Need a key which can be used to encrypt communications.',
file=results)
return ContinueProcessing.no
@@ -170,7 +173,8 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
print('Wrong token.', file=results)
return ContinueProcessing.no
- if pendable.get('type') == KeyChangeWorkflow.pendable_class().PEND_TYPE:
+ if pendable.get('type') in (KeyChangeWorkflow.name,
+ KeyChangeModWorkflow.name):
expecting = CHANGE_CONFIRM_REQUEST.format(pendable.get('fingerprint'),
token)
else:
@@ -178,6 +182,8 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
for sig_subject in wrapped.get_signed():
if expecting in sig_subject:
+ with transaction():
+ pgp_address.key_confirmed = True
ISubscriptionManager(mlist).confirm(token)
break
else:
@@ -211,6 +217,14 @@ def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results):
print('A pgp enabled address not found.', file=results)
return ContinueProcessing.no
+ if pgp_address.key is None:
+ print("You currently don't have a key set.", file=results)
+ return ContinueProcessing.no
+
+ if not pgp_address.key_confirmed:
+ print('Your key is currently not confirmed.', file=results)
+ return ContinueProcessing.no
+
wrapped = PGPWrapper(msg)
if wrapped.is_encrypted():
decrypted = wrapped.try_decrypt(pgp_list.key)
@@ -231,24 +245,108 @@ def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results):
file=results)
return ContinueProcessing.no
- usage_flags = key.usage_flags()
- for subkey in key.subkeys.values():
- usage_flags |= subkey.usage_flags()
- if KeyFlags.EncryptCommunications not in usage_flags:
+ if not key_usable(key, {KeyFlags.EncryptCommunications, KeyFlags.Sign}):
print('Need a key which can be used to encrypt communications.',
file=results)
return ContinueProcessing.no
- workflow = KeyChangeWorkflow(mlist, pgp_address, key)
+ workflow_class = mm_config.workflows[pgp_list.key_change_workflow]
+
+ workflow = workflow_class(mlist, pgp_address, key)
list(workflow)
print('Key change request received.', file=results)
return ContinueProcessing.no
def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results):
- # Current key revocation certificate in attachment, restarts the
- # subscription process, or rather only it's key setup part.
- pass
+ """
+ `key revoke` command.
+
+ Used when a user has to revoke a part of a key used for a `PGPAddress`.
+
+ * This command message CAN be encrypted to the list key, in which case it
+ will be decrypted.
+ * This command message MUST have at least one revocation certificate
+ attached.
+ """
+ if len(arguments) != 1:
+ print('Extraneous argument/s: ' + ','.join(arguments[1:]),
+ file=results)
+ return ContinueProcessing.no
+
+ email = get_email(msg)
+ if not email:
+ print('No email to revoke key of.', file=results)
+ return ContinueProcessing.no
+
+ pgp_address = PGPAddress.for_email(email)
+ if pgp_address is None:
+ print('A pgp enabled address not found.', file=results)
+ return ContinueProcessing.no
+
+ key = pgp_address.key
+ if key is None:
+ print("You currently don't have a key set.", file=results)
+ return ContinueProcessing.no
+
+ if not pgp_address.key_confirmed:
+ print('Your key is currently not confirmed.', file=results)
+ return ContinueProcessing.no
+
+ wrapped = PGPWrapper(msg)
+ if wrapped.is_encrypted():
+ decrypted = wrapped.try_decrypt(pgp_list.key)
+ wrapped = PGPWrapper(decrypted)
+
+ if not wrapped.has_revocs():
+ print('No key revocations attached? Send a key revocation.',
+ file=results)
+ return ContinueProcessing.no
+
+ key_copy = copy.copy(key)
+
+ revocs = list(wrapped.revocs())
+ matches = 0
+ for revoc in revocs:
+ old_matches = matches
+ try:
+ verified = key_copy.verify(key_copy, revoc)
+ if verified:
+ key_copy |= revoc
+ matches += 1
+ continue
+ except PGPError:
+ pass
+
+ for subkey in key_copy.subkeys.values():
+ try:
+ verified = key_copy.verify(subkey, revoc)
+ if verified:
+ subkey |= revoc
+ matches += 1
+ break
+ except PGPError:
+ pass
+ # No match?
+ if matches == old_matches:
+ print('Revocation found for not-found key.', file=results)
+
+ if not key_usable(key_copy,
+ {KeyFlags.EncryptCommunications, KeyFlags.Sign}):
+ # Start reset process.
+ with transaction():
+ pgp_address.key = None
+ pgp_address.key_confirmed = False
+ workflow = KeyRevokeWorkflow(mlist, pgp_address)
+ list(workflow)
+ print('Key needs to be reset.', file=results)
+ else:
+ # Just update key.
+ if matches > 0:
+ with transaction():
+ pgp_address.key = key_copy
+ print('Key succesfully updated.', file=results)
+ return ContinueProcessing.yes
def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results):
@@ -273,7 +371,7 @@ def _cmd_receive(pgp_list, mlist, msg, msgdata, arguments, results):
msg['MIME-Version'] = '1.0'
msg.attach(MIMEText('Here is the public key you requested.'))
wrapped = MIMEWrapper(msg)
- msg = wrapped.attach_key(pgp_list.pubkey)
+ msg = wrapped.attach_keys(pgp_list.pubkey)
msg.send(mlist)
return ContinueProcessing.yes
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
index cfda3e8..1a128a1 100644
--- a/src/mailman_pgp/commands/tests/test_key.py
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -27,13 +27,13 @@ from mailman.runners.command import CommandRunner
from mailman.testing.helpers import get_queue_messages, make_testable_runner
from mailman.utilities.datetime import now
from pgpy import PGPKey, PGPUID
-from pgpy.constants import (PubKeyAlgorithm, KeyFlags, EllipticCurveOID,
- HashAlgorithm, SymmetricKeyAlgorithm,
- CompressionAlgorithm)
+from pgpy.constants import (
+ CompressionAlgorithm, EllipticCurveOID, HashAlgorithm, KeyFlags,
+ PubKeyAlgorithm, SymmetricKeyAlgorithm)
from zope.component import getUtility
from mailman_pgp.config import mm_config
-from mailman_pgp.database import transaction
+from mailman_pgp.database import mm_transaction, transaction
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.mime import MIMEWrapper
@@ -41,7 +41,7 @@ from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.testing.layers import PGPConfigLayer
from mailman_pgp.testing.pgp import load_key
from mailman_pgp.workflows.key_change import CHANGE_CONFIRM_REQUEST
-from mailman_pgp.workflows.pubkey import CONFIRM_REQUEST
+from mailman_pgp.workflows.key_confirm import CONFIRM_REQUEST
from mailman_pgp.workflows.subscription import OpenSubscriptionPolicy
@@ -128,8 +128,7 @@ class TestPreSubscription(unittest.TestCase):
hashes=[HashAlgorithm.SHA256,
HashAlgorithm.SHA512],
ciphers=[SymmetricKeyAlgorithm.AES256],
- compression=[CompressionAlgorithm.ZLIB,
- CompressionAlgorithm.Uncompressed]
+ compression=[CompressionAlgorithm.ZLIB]
)
def test_set(self):
@@ -145,7 +144,7 @@ class TestPreSubscription(unittest.TestCase):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'Re: key set {}'.format(token))
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+ set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey)
mm_config.switchboards['command'].enqueue(set_message,
listid='test.example.com')
@@ -186,7 +185,7 @@ class TestPreSubscription(unittest.TestCase):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'Re: key set {}'.format(token))
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+ set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey)
wrapped_set_message = MIMEWrapper(set_message)
set_message = wrapped_set_message.encrypt(self.pgp_list.pubkey,
self.bart_key.pubkey)
@@ -246,9 +245,9 @@ class TestPreSubscription(unittest.TestCase):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'Re: key set token')
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+ set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey)
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(self.anne_key.pubkey)
+ set_message = wrapped_set_message.attach_keys(self.anne_key.pubkey)
mm_config.switchboards['command'].enqueue(set_message,
listid='test.example.com')
@@ -263,7 +262,7 @@ class TestPreSubscription(unittest.TestCase):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'Re: key set token')
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(self.bart_key)
+ set_message = wrapped_set_message.attach_keys(self.bart_key)
mm_config.switchboards['command'].enqueue(set_message,
listid='test.example.com')
@@ -278,7 +277,7 @@ class TestPreSubscription(unittest.TestCase):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'Re: key set token')
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(self.unusable_key.pubkey)
+ set_message = wrapped_set_message.attach_keys(self.unusable_key.pubkey)
mm_config.switchboards['command'].enqueue(set_message,
listid='test.example.com')
@@ -293,7 +292,7 @@ class TestPreSubscription(unittest.TestCase):
def test_set_no_email(self):
message = _create_mixed('', 'test@example.com', 'key set token')
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_key(self.bart_key.pubkey)
+ message = wrapped_message.attach_keys(self.bart_key.pubkey)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -307,7 +306,7 @@ class TestPreSubscription(unittest.TestCase):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'key set token')
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+ set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey)
mm_config.switchboards['command'].enqueue(set_message,
listid='test.example.com')
@@ -326,7 +325,7 @@ class TestPreSubscription(unittest.TestCase):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'key set token')
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+ set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey)
mm_config.switchboards['command'].enqueue(set_message,
listid='test.example.com')
@@ -349,7 +348,7 @@ class TestPreSubscription(unittest.TestCase):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'key set token')
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+ set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey)
mm_config.switchboards['command'].enqueue(set_message,
listid='test.example.com')
@@ -594,9 +593,13 @@ class TestAfterSubscription(unittest.TestCase):
layer = PGPConfigLayer
def setUp(self):
- self.mlist = create_list('test@example.com', style_name='pgp-default')
- self.pgp_list = PGPMailingList.for_list(self.mlist)
- self.pgp_list.key = load_key('ecc_p256.priv.asc')
+ with mm_transaction():
+ self.mlist = create_list('test@example.com',
+ style_name='pgp-default')
+ with transaction():
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = load_key('ecc_p256.priv.asc')
+ self.pgp_list.key_change_workflow = 'pgp-key-change-workflow'
self.bart_key = load_key('rsa_1024.priv.asc')
self.bart_new_key = load_key('ecc_p256.priv.asc')
@@ -611,8 +614,7 @@ class TestAfterSubscription(unittest.TestCase):
hashes=[HashAlgorithm.SHA256,
HashAlgorithm.SHA512],
ciphers=[SymmetricKeyAlgorithm.AES256],
- compression=[CompressionAlgorithm.ZLIB,
- CompressionAlgorithm.Uncompressed]
+ compression=[CompressionAlgorithm.ZLIB]
)
def test_change(self):
@@ -628,7 +630,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_key(self.bart_new_key.pubkey)
+ message = wrapped_message.attach_keys(self.bart_new_key.pubkey)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -663,7 +665,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_key(self.bart_new_key.pubkey)
+ message = wrapped_message.attach_keys(self.bart_new_key.pubkey)
wrapped_message = MIMEWrapper(message)
message = wrapped_message.encrypt(self.pgp_list.pubkey)
@@ -700,7 +702,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_key(self.bart_new_key.pubkey)
+ message = wrapped_message.attach_keys(self.bart_new_key.pubkey)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -751,7 +753,7 @@ class TestAfterSubscription(unittest.TestCase):
def test_change_no_email(self):
message = _create_mixed('', 'test@example.com', 'key change')
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_key(self.bart_key.pubkey)
+ message = wrapped_message.attach_keys(self.bart_key.pubkey)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -765,7 +767,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_key(self.bart_key.pubkey)
+ message = wrapped_message.attach_keys(self.bart_key.pubkey)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -810,9 +812,9 @@ class TestAfterSubscription(unittest.TestCase):
'key change')
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+ set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey)
wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_key(self.bart_new_key.pubkey)
+ set_message = wrapped_set_message.attach_keys(self.bart_new_key.pubkey)
mm_config.switchboards['command'].enqueue(set_message,
listid='test.example.com')
@@ -835,7 +837,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_key(self.bart_key)
+ message = wrapped_message.attach_keys(self.bart_key)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -858,7 +860,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_key(self.unusable_key.pubkey)
+ message = wrapped_message.attach_keys(self.unusable_key.pubkey)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -870,6 +872,173 @@ class TestAfterSubscription(unittest.TestCase):
'Need a key which can be used to encrypt communications.',
results_msg.get_payload())
+ def test_revoke_extra_arg(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key revoke extra arguments', '')
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Extraneous argument/s: extra,arguments',
+ results_msg.get_payload())
+
+ def test_revoke_no_email(self):
+ message = _create_mixed('', 'test@example.com', 'key revoke')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No email to revoke key of.', results_msg.get_payload())
+
+ def test_revoke_no_pgp_address(self):
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key revoke')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('A pgp enabled address not found.',
+ results_msg.get_payload())
+
+ def test_revoke_no_key_set(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ t.add(pgp_address)
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key revoke')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn("You currently don't have a key set.",
+ results_msg.get_payload())
+
+ def test_revoke_key_not_confirmed(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ t.add(pgp_address)
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key revoke')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Your key is currently not confirmed.',
+ results_msg.get_payload())
+
+ def test_revoke_no_revocs(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key revoke', '')
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No key revocations attached? Send a key revocation.',
+ results_msg.get_payload())
+
+ def test_revoke_resets(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ revoc = self.bart_key.revoke(self.bart_key)
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key revoke')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_revocs(revoc)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=2)
+ if (items[0].msg['Subject'] ==
+ 'The results of your email commands'): # pragma: no cover
+ results_msg = items[0].msg
+ else:
+ results_msg = items[1].msg
+ #TODO: finish test
+
+ self.assertIn('Key needs to be reset.', results_msg.get_payload())
+
+ def test_revoke_updates(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+
+ test_key = PGPKey.new(PubKeyAlgorithm.RSAEncryptOrSign, 1024)
+ uid = PGPUID.new('Some Name', email='anne@example.org')
+ test_key.add_uid(uid,
+ usage={KeyFlags.Certify,
+ KeyFlags.EncryptCommunications,
+ KeyFlags.Sign},
+ hashes=[HashAlgorithm.SHA256,
+ HashAlgorithm.SHA512],
+ ciphers=[SymmetricKeyAlgorithm.AES256],
+ compression=[CompressionAlgorithm.ZLIB])
+ sub = PGPKey.new(PubKeyAlgorithm.ECDH, EllipticCurveOID.SECP256K1)
+ test_key.add_subkey(sub, usage={KeyFlags.EncryptCommunications})
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = test_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ revoc = test_key.revoke(sub.pubkey)
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key revoke')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_revocs(revoc)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Key succesfully updated.', results_msg.get_payload())
+ sub = next(iter(pgp_address.key.subkeys.values()))
+ revocs = list(sub.revocation_signatures)
+ self.assertEqual(len(revocs), 1)
+ self.assertEqual(revoc.hash2, revocs[0].hash2)
+
class TestGeneral(unittest.TestCase):
layer = PGPConfigLayer
diff --git a/src/mailman_pgp/model/list.py b/src/mailman_pgp/model/list.py
index d80f4a7..9c2d317 100644
--- a/src/mailman_pgp/model/list.py
+++ b/src/mailman_pgp/model/list.py
@@ -54,6 +54,10 @@ class PGPMailingList(Base):
nonencrypted_msg_action = Column(Enum(Action), default=Action.reject)
encrypt_outgoing = Column(Boolean, default=True)
+ # Key related properties
+ key_change_workflow = Column(SAUnicode,
+ default='pgp-key-change-mod-workflow')
+
def __init__(self, mlist):
"""
diff --git a/src/mailman_pgp/pgp/inline.py b/src/mailman_pgp/pgp/inline.py
index bb0971d..49f0a6e 100644
--- a/src/mailman_pgp/pgp/inline.py
+++ b/src/mailman_pgp/pgp/inline.py
@@ -18,12 +18,14 @@
"""Strict inline PGP message wrapper."""
import copy
from email.iterators import walk
+from email.mime.text import MIMEText
from pgpy import PGPMessage
from pgpy.constants import SymmetricKeyAlgorithm
from public import public
-from mailman_pgp.utils.pgp import key_from_blob
+from mailman_pgp.utils.email import make_multipart
+from mailman_pgp.utils.pgp import key_from_blob, revoc_from_blob
@public
@@ -95,7 +97,7 @@ class InlineWrapper:
:rtype: typing.Generator[pgpy.PGPMessage]
"""
for part in walk(self.msg):
- if not part.is_multipart() and self._is_signed(part):
+ if not part.is_multipart():
try:
msg = PGPMessage.from_blob(part.get_payload())
except:
@@ -135,7 +137,7 @@ class InlineWrapper:
:rtype: typing.Generator[pgpy.PGPMessage]
"""
for part in walk(self.msg):
- if not part.is_multipart() and self._is_encrypted(part):
+ if not part.is_multipart():
try:
msg = PGPMessage.from_blob(part.get_payload())
except:
@@ -176,13 +178,56 @@ class InlineWrapper:
:rtype: Generator[pgpy.PGPKey]
"""
for part in walk(self.msg):
- if not part.is_multipart() and self._has_keys(part):
+ if not part.is_multipart():
try:
key = key_from_blob(part.get_payload())
except:
continue
yield key
+ def _is_revoc(self, part):
+ try:
+ revoc_from_blob(part.get_payload())
+ except ValueError:
+ return False
+ return True
+
+ def is_revocs(self):
+ for part in walk(self.msg):
+ if (not part.is_multipart() and not self._is_revoc(part)):
+ return False
+ return True
+
+ def has_revocs(self):
+ for part in walk(self.msg):
+ if (not part.is_multipart() and self._is_revoc(part)):
+ return True
+ return False
+
+ def revocs(self):
+ for part in walk(self.msg):
+ if not part.is_multipart():
+ try:
+ revoc = revoc_from_blob(part.get_payload())
+ except:
+ continue
+ yield revoc
+
+ def attach_revocs(self, *key_revocations):
+ """
+ Attach a key revocation signature to the message.
+
+ :param key_revocations: A key revocation signature to attach.
+ :type key_revocations: pgpy.PGPSignature
+ :return: The message with the signature attached.
+ :rtype: mailman.email.message.Message
+ """
+ out = make_multipart(self.msg)
+ for key_revocation in key_revocations:
+ revoc_part = MIMEText(str(key_revocation))
+ out.attach(revoc_part)
+ return out
+
def verify(self, key):
"""
Verify the signatures of this message with key.
diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py
index a1303c9..03177ab 100644
--- a/src/mailman_pgp/pgp/mime.py
+++ b/src/mailman_pgp/pgp/mime.py
@@ -28,8 +28,8 @@ from pgpy import PGPDetachedSignature, PGPMessage
from pgpy.constants import HashAlgorithm, SymmetricKeyAlgorithm
from public import public
-from mailman_pgp.utils.email import copy_headers
-from mailman_pgp.utils.pgp import key_from_blob
+from mailman_pgp.utils.email import copy_headers, make_multipart
+from mailman_pgp.utils.pgp import key_from_blob, revoc_from_blob
@public
@@ -63,7 +63,7 @@ class MIMEWrapper:
def _is_mime(self):
is_multipart = self.msg.is_multipart()
- payloads = len(self.msg.get_payload())
+ payloads = len(self.msg.get_payload()) if self.msg.get_payload() else 0
return is_multipart and payloads == 2
@@ -186,26 +186,81 @@ class MIMEWrapper:
continue
yield key
- def attach_key(self, key):
+ def attach_keys(self, *keys):
"""
Attach a key to this message, as per RFC3156 section 7.
- :param key: A key to attach.
- :type key: pgpy.PGPKey
+ :param keys: A key to attach.
+ :type keys: pgpy.PGPKey
:return: The message with the key attached.
:rtype: mailman.email.message.Message
"""
- filename = '0x' + key.fingerprint.keyid + '.asc'
- key_part = MIMEApplication(_data=str(key),
- _subtype=MIMEWrapper._keys_subtype,
- _encoder=encode_7or8bit,
- name=filename)
- key_part.add_header('Content-Description',
- 'OpenPGP key')
- key_part.add_header('Content-Disposition', 'attachment',
- filename=filename)
- out = copy.deepcopy(self.msg)
- out.attach(key_part)
+ out = make_multipart(self.msg)
+ for key in keys:
+ filename = '0x' + key.fingerprint.keyid + '.asc'
+ key_part = MIMEApplication(_data=str(key),
+ _subtype=MIMEWrapper._keys_subtype,
+ _encoder=encode_7or8bit,
+ name=filename)
+ key_part.add_header('Content-Description',
+ 'OpenPGP key')
+ key_part.add_header('Content-Disposition', 'attachment',
+ filename=filename)
+ out.attach(key_part)
+ return out
+
+ def _is_revoc(self, part):
+ if part.get_content_type() != MIMEWrapper._keys_type:
+ return False
+ try:
+ revoc_from_blob(part.get_payload())
+ except ValueError:
+ return False
+ return True
+
+ def is_revocs(self):
+ for part in walk(self.msg):
+ if (not part.is_multipart() and not self._is_revoc(part)):
+ return False
+ return True
+
+ def has_revocs(self):
+ for part in walk(self.msg):
+ if (not part.is_multipart() and self._is_revoc(part)):
+ return True
+ return False
+
+ def revocs(self):
+ for part in walk(self.msg):
+ if (not part.is_multipart() # noqa
+ and part.get_content_type() == MIMEWrapper._keys_type):
+ try:
+ revoc = revoc_from_blob(part.get_payload())
+ except:
+ continue
+ yield revoc
+
+ def attach_revocs(self, *key_revocations):
+ """
+ Attach a key revocation signature to the message, as a key subpart.
+
+ :param key_revocations: A key revocation signature to attach.
+ :type key_revocations: pgpy.PGPSignature
+ :return: The message with the signature attached.
+ :rtype: mailman.email.message.Message
+ """
+ out = make_multipart(self.msg)
+ for key_revocation in key_revocations:
+ filename = '0x' + key_revocation.signer + '.asc'
+ revoc_part = MIMEApplication(_data=str(key_revocation),
+ _subtype=MIMEWrapper._keys_subtype,
+ _encoder=encode_7or8bit,
+ name=filename)
+ revoc_part.add_header('Content-Description',
+ 'OpenPGP key')
+ revoc_part.add_header('Content-Disposition', 'attachment',
+ filename=filename)
+ out.attach(revoc_part)
return out
def verify(self, key):
diff --git a/src/mailman_pgp/pgp/tests/data/messages/inline_revoc.eml b/src/mailman_pgp/pgp/tests/data/messages/inline_revoc.eml
new file mode 100644
index 0000000..f215777
--- /dev/null
+++ b/src/mailman_pgp/pgp/tests/data/messages/inline_revoc.eml
@@ -0,0 +1,19 @@
+To: nobody@example.org
+From: RSA 1024b example <RSA-1024b@example.org>
+Subject: Some subject.
+Message-ID: <76a591ed-bfc4-d08b-73d3-fc2489148fd7@example.org>
+Date: Wed, 21 Jun 2017 13:50:59 +0200
+User-Agent: Mutt/1.7.2 (2016-11-26)
+MIME-Version: 1.0
+Content-Type: text/plain; charset=utf-8
+Content-Transfer-Encoding: 8bit
+
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Comment: This is a revocation certificate
+
+iLYEIAEIACAWIQTUqUiGscoglqZFjlxH8QwIeIS3WAUCWYClvgIdAgAKCRBH8QwI
+eIS3WLpBBADCUtyYmI2Z8DCOnKUW4nRjHc3ZVMoZJlwceJCWhSybBrnjo6LWzvBy
+eKke4qHlh+jmSk3/Qyio6vYzvicOayDwhr0s/1X26MiYorthfiCQOg2WQ0YiRuMC
+/Ml8rukvBTRGvXikcIuBw5MFqCWsWI7ExPKaaresnHaCn37KF0A6hw==
+=4oku
+-----END PGP PUBLIC KEY BLOCK-----
diff --git a/src/mailman_pgp/pgp/tests/data/messages/inline_revoc_multipart.eml b/src/mailman_pgp/pgp/tests/data/messages/inline_revoc_multipart.eml
new file mode 100644
index 0000000..a1052d6
--- /dev/null
+++ b/src/mailman_pgp/pgp/tests/data/messages/inline_revoc_multipart.eml
@@ -0,0 +1,30 @@
+To: nobody@example.org
+From: RSA 1024b example <RSA-1024b@example.org>
+Subject: Some subject.
+Message-ID: <76a591ed-bfc4-d08b-73d3-fc2489148fd7@example.org>
+Date: Wed, 21 Jun 2017 13:50:59 +0200
+User-Agent: Mutt/1.7.2 (2016-11-26)
+MIME-Version: 1.0
+Content-Type: multipart/mixed; boundary="abjqkjsfwqsfa546qw2wfq6sdq2sqwr56qqs"
+
+--abjqkjsfwqsfa546qw2wfq6sdq2sqwr56qqs
+Content-Type: text/plain; charset=utf-8
+Content-Transfer-Encoding: 8bit
+
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Comment: This is a revocation certificate
+
+iLYEIAEIACAWIQTUqUiGscoglqZFjlxH8QwIeIS3WAUCWYClvgIdAgAKCRBH8QwI
+eIS3WLpBBADCUtyYmI2Z8DCOnKUW4nRjHc3ZVMoZJlwceJCWhSybBrnjo6LWzvBy
+eKke4qHlh+jmSk3/Qyio6vYzvicOayDwhr0s/1X26MiYorthfiCQOg2WQ0YiRuMC
+/Ml8rukvBTRGvXikcIuBw5MFqCWsWI7ExPKaaresnHaCn37KF0A6hw==
+=4oku
+-----END PGP PUBLIC KEY BLOCK-----
+
+--abjqkjsfwqsfa546qw2wfq6sdq2sqwr56qqs
+Content-Type: text/plain; charset=utf-8
+Content-Transfer-Encoding: 8bit
+
+Some cleartext.
+
+--abjqkjsfwqsfa546qw2wfq6sdq2sqwr56qqs-- \ No newline at end of file
diff --git a/src/mailman_pgp/pgp/tests/data/messages/mime_revoc.eml b/src/mailman_pgp/pgp/tests/data/messages/mime_revoc.eml
new file mode 100644
index 0000000..e1055a3
--- /dev/null
+++ b/src/mailman_pgp/pgp/tests/data/messages/mime_revoc.eml
@@ -0,0 +1,35 @@
+To: nobody@example.org
+From: RSA 1024b example <RSA-1024b@example.org>
+Subject: Some subject.
+Message-ID: <76a591ed-bfc4-d08b-73d3-fc2489148fd7@example.org>
+Date: Wed, 21 Jun 2017 13:50:59 +0200
+User-Agent: Mutt/1.7.2 (2016-11-26)
+MIME-Version: 1.0
+Content-Type: multipart/mixed;
+ boundary="------------A851F166D50529639139DD0B"
+
+This is a multi-part message in MIME format.
+--------------A851F166D50529639139DD0B
+Content-Type: text/plain; charset=utf-8
+Content-Transfer-Encoding: 7bit
+
+Some other text.
+
+--------------A851F166D50529639139DD0B
+Content-Type: application/pgp-keys;
+ name="0x7884B758.asc"
+Content-Transfer-Encoding: 7bit
+Content-Disposition: attachment;
+ filename="0x7884B758.asc"
+
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Comment: This is a revocation certificate
+
+iLYEIAEIACAWIQTUqUiGscoglqZFjlxH8QwIeIS3WAUCWYClvgIdAgAKCRBH8QwI
+eIS3WLpBBADCUtyYmI2Z8DCOnKUW4nRjHc3ZVMoZJlwceJCWhSybBrnjo6LWzvBy
+eKke4qHlh+jmSk3/Qyio6vYzvicOayDwhr0s/1X26MiYorthfiCQOg2WQ0YiRuMC
+/Ml8rukvBTRGvXikcIuBw5MFqCWsWI7ExPKaaresnHaCn37KF0A6hw==
+=4oku
+-----END PGP PUBLIC KEY BLOCK-----
+
+--------------A851F166D50529639139DD0B--
diff --git a/src/mailman_pgp/pgp/tests/data/revocs/dsa_elgamal_1024.revoc.asc b/src/mailman_pgp/pgp/tests/data/revocs/dsa_elgamal_1024.revoc.asc
new file mode 100644
index 0000000..80b002e
--- /dev/null
+++ b/src/mailman_pgp/pgp/tests/data/revocs/dsa_elgamal_1024.revoc.asc
@@ -0,0 +1,8 @@
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Comment: This is a revocation certificate
+
+iGAEIBECACAWIQRR5fGEErwNQ8nevZPDzAoGOEHHpwUCWYCmxwIdAgAKCRDDzAoG
+OEHHp+nYAJwM095Qv9qxaMfqx8xSLD5eKeExgQCeNA5z7zZZcgtUTTqW/o4baX6D
+X8E=
+=vVlH
+-----END PGP PUBLIC KEY BLOCK-----
diff --git a/src/mailman_pgp/pgp/tests/data/revocs/ecc_curve25519.revoc.asc b/src/mailman_pgp/pgp/tests/data/revocs/ecc_curve25519.revoc.asc
new file mode 100644
index 0000000..df7d593
--- /dev/null
+++ b/src/mailman_pgp/pgp/tests/data/revocs/ecc_curve25519.revoc.asc
@@ -0,0 +1,8 @@
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Comment: This is a revocation certificate
+
+iHgEIBYIACAWIQQFsA605escSIQh4V/6me+juNPIugUCWYCmggIdAgAKCRD6me+j
+uNPIuuVxAP4mr8PXnLIzqdysvMo5Mo0QbIck6NgO/rOJC4Kj/tGjyQD/QMo4ON/1
+cxzD068xWtlDpPYjtPBFuNYrLOPDYvqjGAg=
+=0TTd
+-----END PGP PUBLIC KEY BLOCK-----
diff --git a/src/mailman_pgp/pgp/tests/data/revocs/ecc_p256.revoc.asc b/src/mailman_pgp/pgp/tests/data/revocs/ecc_p256.revoc.asc
new file mode 100644
index 0000000..9571cd2
--- /dev/null
+++ b/src/mailman_pgp/pgp/tests/data/revocs/ecc_p256.revoc.asc
@@ -0,0 +1,8 @@
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Comment: This is a revocation certificate
+
+iHgEIBMIACAWIQSyKLDm+5ZVhlrf0j03k3IFvhWo0AUCWYCmZAIdAgAKCRA3k3IF
+vhWo0HAZAP9qFHrB6d+hNpYDT9jVCDIURh8Ml711hi8zsDhSkRZ4yAEAgzlmzjPZ
+VhgcOemgbYTKO7Kj8q61KpVP9oZ9l7Z4c/I=
+=ZZsy
+-----END PGP PUBLIC KEY BLOCK-----
diff --git a/src/mailman_pgp/pgp/tests/data/revocs/ecc_secp256k1.revoc.asc b/src/mailman_pgp/pgp/tests/data/revocs/ecc_secp256k1.revoc.asc
new file mode 100644
index 0000000..e91a62a
--- /dev/null
+++ b/src/mailman_pgp/pgp/tests/data/revocs/ecc_secp256k1.revoc.asc
@@ -0,0 +1,8 @@
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Comment: This is a revocation certificate
+
+iHgEIBMIACAWIQRi7dSuUMnX5cDo6nvIw1p/AFOkvwUCWYCmEwIdAgAKCRDIw1p/
+AFOkv5zZAQDiOzuQWpL2cvm+Jz4XyeLWebiOe7zirM9oMP03rmzQxgD/UIzBnqq9
+iy6oREDvbNw1sCB8/90ihlkB3iM8eOW9iSk=
+=lj1I
+-----END PGP PUBLIC KEY BLOCK-----
diff --git a/src/mailman_pgp/pgp/tests/data/revocs/rsa_1024.revoc.asc b/src/mailman_pgp/pgp/tests/data/revocs/rsa_1024.revoc.asc
new file mode 100644
index 0000000..ddb8974
--- /dev/null
+++ b/src/mailman_pgp/pgp/tests/data/revocs/rsa_1024.revoc.asc
@@ -0,0 +1,9 @@
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Comment: This is a revocation certificate
+
+iLYEIAEIACAWIQTUqUiGscoglqZFjlxH8QwIeIS3WAUCWYClvgIdAgAKCRBH8QwI
+eIS3WLpBBADCUtyYmI2Z8DCOnKUW4nRjHc3ZVMoZJlwceJCWhSybBrnjo6LWzvBy
+eKke4qHlh+jmSk3/Qyio6vYzvicOayDwhr0s/1X26MiYorthfiCQOg2WQ0YiRuMC
+/Ml8rukvBTRGvXikcIuBw5MFqCWsWI7ExPKaaresnHaCn37KF0A6hw==
+=4oku
+-----END PGP PUBLIC KEY BLOCK-----
diff --git a/src/mailman_pgp/pgp/tests/test_inline.py b/src/mailman_pgp/pgp/tests/test_inline.py
index 7f82ab4..d9f8c8a 100644
--- a/src/mailman_pgp/pgp/tests/test_inline.py
+++ b/src/mailman_pgp/pgp/tests/test_inline.py
@@ -20,7 +20,8 @@
from parameterized import parameterized
from mailman_pgp.pgp.inline import InlineWrapper
-from mailman_pgp.testing.pgp import load_key, load_message, WrapperTestCase
+from mailman_pgp.testing.pgp import (
+ load_key, load_message, load_revoc, WrapperTestCase)
class InlineWrapperTestCase(WrapperTestCase):
@@ -219,6 +220,46 @@ class TestKeys(InlineWrapperTestCase):
self.keys(message, keys)
+class TestRevocs(InlineWrapperTestCase):
+ @parameterized.expand([
+ (load_message('inline_revoc.eml'),
+ True),
+ (load_message('inline_revoc_multipart.eml'),
+ True)
+ ])
+ def test_has_revocs(self, message, has_revocs):
+ self.has_revocs(message, has_revocs)
+
+ @parameterized.expand([
+ (load_message('inline_revoc.eml'),
+ True),
+ (load_message('inline_revoc_multipart.eml'),
+ False)
+ ])
+ def test_is_revocs(self, message, is_revocs):
+ self.is_revocs(message, is_revocs)
+
+ @parameterized.expand([
+ (load_message('inline_revoc.eml'),
+ (load_revoc('rsa_1024.revoc.asc'),)),
+ (load_message('inline_revoc_multipart.eml'),
+ (load_revoc('rsa_1024.revoc.asc'),))
+ ])
+ def test_revocs(self, message, revocs):
+ self.revocs(message, revocs)
+
+ @parameterized.expand([
+ (load_message('clear.eml'),
+ [load_revoc('rsa_1024.revoc.asc'),
+ load_revoc('ecc_p256.revoc.asc')]),
+ (load_message('clear_multipart.eml'),
+ [load_revoc('rsa_1024.revoc.asc'),
+ load_revoc('ecc_p256.revoc.asc')])
+ ])
+ def test_attach_revocs(self, message, revocs):
+ self.attach_revocs(message, revocs)
+
+
class TestCombined(InlineWrapperTestCase):
@parameterized.expand([
(load_message('clear.eml'),
diff --git a/src/mailman_pgp/pgp/tests/test_mime.py b/src/mailman_pgp/pgp/tests/test_mime.py
index 78c9e71..e9951cf 100644
--- a/src/mailman_pgp/pgp/tests/test_mime.py
+++ b/src/mailman_pgp/pgp/tests/test_mime.py
@@ -20,7 +20,8 @@
from parameterized import parameterized
from mailman_pgp.pgp.mime import MIMEWrapper
-from mailman_pgp.testing.pgp import load_key, load_message, WrapperTestCase
+from mailman_pgp.testing.pgp import (
+ load_key, load_message, load_revoc, WrapperTestCase)
class MIMEWrapperTestCase(WrapperTestCase):
@@ -178,6 +179,51 @@ class TestKeys(MIMEWrapperTestCase):
def test_keys(self, message, keys):
self.keys(message, keys)
+ @parameterized.expand([
+ (load_message('clear.eml'),
+ [load_key('rsa_1024.priv.asc'),
+ load_key('ecc_p256.priv.asc')]),
+ (load_message('clear_multipart.eml'),
+ [load_key('rsa_1024.priv.asc'),
+ load_key('ecc_p256.priv.asc')])
+ ])
+ def test_attach_keys(self, message, keys):
+ self.attach_keys(message, keys)
+
+
+class TestRevocs(MIMEWrapperTestCase):
+ @parameterized.expand([
+ (load_message('mime_revoc.eml'),
+ True)
+ ])
+ def test_has_revocs(self, message, has_revocs):
+ self.has_revocs(message, has_revocs)
+
+ @parameterized.expand([
+ (load_message('mime_revoc.eml'),
+ False)
+ ])
+ def test_is_revocs(self, message, is_revocs):
+ self.is_revocs(message, is_revocs)
+
+ @parameterized.expand([
+ (load_message('mime_revoc.eml'),
+ (load_revoc('rsa_1024.revoc.asc'),))
+ ])
+ def test_revocs(self, message, revocs):
+ self.revocs(message, revocs)
+
+ @parameterized.expand([
+ (load_message('clear.eml'),
+ [load_revoc('rsa_1024.revoc.asc'),
+ load_revoc('ecc_p256.revoc.asc')]),
+ (load_message('clear_multipart.eml'),
+ [load_revoc('rsa_1024.revoc.asc'),
+ load_revoc('ecc_p256.revoc.asc')])
+ ])
+ def test_attach_revocs(self, message, revocs):
+ self.attach_revocs(message, revocs)
+
class TestCombined(MIMEWrapperTestCase):
@parameterized.expand([
diff --git a/src/mailman_pgp/pgp/tests/test_wrapper.py b/src/mailman_pgp/pgp/tests/test_wrapper.py
index f1f7621..b9c157a 100644
--- a/src/mailman_pgp/pgp/tests/test_wrapper.py
+++ b/src/mailman_pgp/pgp/tests/test_wrapper.py
@@ -22,7 +22,8 @@ from mailman_pgp.pgp.inline import InlineWrapper
from mailman_pgp.pgp.mime import MIMEWrapper
from mailman_pgp.pgp.mime_multisig import MIMEMultiSigWrapper
from mailman_pgp.pgp.wrapper import PGPWrapper
-from mailman_pgp.testing.pgp import load_key, load_message, WrapperTestCase
+from mailman_pgp.testing.pgp import (
+ load_key, load_message, load_revoc, WrapperTestCase)
class PGPWrapperTestCase(WrapperTestCase):
@@ -180,6 +181,41 @@ class TestKeys(PGPWrapperTestCase):
self.keys(message, keys)
+class TestRevocs(PGPWrapperTestCase):
+ @parameterized.expand([
+ (load_message('mime_revoc.eml'),
+ True),
+ (load_message('inline_revoc.eml'),
+ True),
+ (load_message('inline_revoc_multipart.eml'),
+ True)
+ ])
+ def test_has_revocs(self, message, has_revocs):
+ self.has_revocs(message, has_revocs)
+
+ @parameterized.expand([
+ (load_message('mime_revoc.eml'),
+ False),
+ (load_message('inline_revoc.eml'),
+ True),
+ (load_message('inline_revoc_multipart.eml'),
+ False)
+ ])
+ def test_is_revocs(self, message, is_revocs):
+ self.is_revocs(message, is_revocs)
+
+ @parameterized.expand([
+ (load_message('mime_revoc.eml'),
+ (load_revoc('rsa_1024.revoc.asc'),)),
+ (load_message('inline_revoc.eml'),
+ (load_revoc('rsa_1024.revoc.asc'),)),
+ (load_message('inline_revoc_multipart.eml'),
+ (load_revoc('rsa_1024.revoc.asc'),))
+ ])
+ def test_revocs(self, message, revocs):
+ self.revocs(message, revocs)
+
+
class TestCombined(PGPWrapperTestCase):
@parameterized.expand([
(load_message('clear.eml'),
diff --git a/src/mailman_pgp/pgp/wrapper.py b/src/mailman_pgp/pgp/wrapper.py
index 6193b57..f0519cb 100644
--- a/src/mailman_pgp/pgp/wrapper.py
+++ b/src/mailman_pgp/pgp/wrapper.py
@@ -270,7 +270,7 @@ class PGPWrapper():
Get the collection of keys in this message.
:return: A collection of keys.
- :rtype: Generator[pgpy.PGPKey]
+ :rtype: typing.Generator[pgpy.PGPKey]
"""
if self.mime.has_keys():
yield from self.mime.keys()
@@ -278,3 +278,32 @@ class PGPWrapper():
yield from self.multisig.keys()
elif self.inline.has_keys():
yield from self.inline.keys()
+
+ def has_revocs(self):
+ """
+
+ :return:
+ :rtype: bool
+ """
+ return any(wrapper.has_revocs() for wrapper in self.wrappers)
+
+ def is_revocs(self):
+ """
+
+ :return:
+ :rtype: bool
+ """
+ return any(wrapper.is_revocs() for wrapper in self.wrappers)
+
+ def revocs(self):
+ """
+
+ :return:
+ :rtype: typing.Generator[pgpy.PGPSignature]
+ """
+ if self.mime.has_revocs():
+ yield from self.mime.revocs()
+ elif self.multisig.has_revocs():
+ yield from self.multisig.revocs()
+ elif self.inline.has_revocs():
+ yield from self.inline.revocs()
diff --git a/src/mailman_pgp/testing/pgp.py b/src/mailman_pgp/testing/pgp.py
index 844b708..89f7a98 100644
--- a/src/mailman_pgp/testing/pgp.py
+++ b/src/mailman_pgp/testing/pgp.py
@@ -19,6 +19,7 @@
import os
from email import message_from_bytes
+from operator import attrgetter
from unittest import TestCase
from mailman.email.message import Message
@@ -26,21 +27,36 @@ from pgpy import PGPKey
from pkg_resources import resource_string
from mailman_pgp.testing.layers import PGPLayer
+from mailman_pgp.utils.pgp import revoc_from_blob
+
+
+def load_blob(*path):
+ return resource_string('mailman_pgp.pgp.tests',
+ os.path.join('data', *path))
def load_message(path):
- data = resource_string('mailman_pgp.pgp.tests',
- os.path.join('data', 'messages', path))
- return message_from_bytes(data, Message)
+ """
+ :rtype: Message
+ """
+ return message_from_bytes(load_blob('messages', path), Message)
def load_key(path):
- key, _ = PGPKey.from_blob(
- resource_string('mailman_pgp.pgp.tests',
- os.path.join('data', 'keys', path)))
+ """
+ :rtype: pgpy.PGPKey
+ """
+ key, _ = PGPKey.from_blob(load_blob('keys', path))
return key
+def load_revoc(path):
+ """
+ :rtype: pgpy.PGPSignature
+ """
+ return revoc_from_blob(load_blob('revocs', path))
+
+
def payload_equal(one_msg, other_msg):
one_payload = one_msg.get_payload()
other_payload = other_msg.get_payload()
@@ -135,10 +151,49 @@ class WrapperTestCase(TestCase):
loaded = list(wrapped.keys())
self.assertEqual(len(loaded), len(keys))
- loaded_fingerprints = list(map(lambda key: key.fingerprint, loaded))
- fingerprints = list(map(lambda key: key.fingerprint, keys))
+ loaded_fingerprints = list(map(attrgetter('fingerprint'), loaded))
+ fingerprints = list(map(attrgetter('fingerprint'), keys))
+ self.assertListEqual(loaded_fingerprints, fingerprints)
+
+ def attach_keys(self, message, keys):
+ wrapped = self.wrap(message)
+ attached = wrapped.attach_keys(*keys)
+ wrapped = self.wrap(attached)
+ loaded = list(wrapped.keys())
+
+ self.assertTrue(wrapped.has_keys())
+ loaded_fingerprints = list(map(attrgetter('fingerprint'), loaded))
+ fingerprints = list(map(attrgetter('fingerprint'), keys))
self.assertListEqual(loaded_fingerprints, fingerprints)
+ def has_revocs(self, message, has_revocs):
+ wrapped = self.wrap(message)
+ self.assertEqual(wrapped.has_revocs(), has_revocs)
+
+ def is_revocs(self, message, is_revocs):
+ wrapped = self.wrap(message)
+ self.assertEqual(wrapped.is_revocs(), is_revocs)
+
+ def revocs(self, message, revocs):
+ wrapped = self.wrap(message)
+ loaded = list(wrapped.revocs())
+ self.assertEqual(len(loaded), len(revocs))
+
+ loaded_issuers = list(map(attrgetter('signer'), loaded))
+ issuers = list(map(attrgetter('signer'), revocs))
+ self.assertListEqual(loaded_issuers, issuers)
+
+ def attach_revocs(self, message, revocs):
+ wrapped = self.wrap(message)
+ attached = wrapped.attach_revocs(*revocs)
+ wrapped = self.wrap(attached)
+ loaded = list(wrapped.revocs())
+
+ self.assertTrue(wrapped.has_revocs())
+ loaded_issuers = list(map(attrgetter('signer'), loaded))
+ issuers = list(map(attrgetter('signer'), revocs))
+ self.assertListEqual(loaded_issuers, issuers)
+
def sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key):
wrapped = self.wrap(message)
encrypted = wrapped.sign_encrypt(sign_key, encrypt_key.pubkey)
diff --git a/src/mailman_pgp/utils/email.py b/src/mailman_pgp/utils/email.py
index a936458..ac1ab66 100644
--- a/src/mailman_pgp/utils/email.py
+++ b/src/mailman_pgp/utils/email.py
@@ -16,8 +16,10 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
""""""
+import copy
from email.utils import parseaddr
+from mailman.email.message import MultipartDigestMessage
from public import public
@@ -62,6 +64,25 @@ def overwrite_message(from_msg, to_msg):
@public
+def make_multipart(msg):
+ """
+
+ :param msg:
+ :type msg: email.message.Message
+ :return:
+ :rtype: email.message.MIMEMultipart|
+ mailman.email.message.MultipartDigestMessage
+ """
+ if msg.is_multipart():
+ out = copy.deepcopy(msg)
+ else:
+ out = MultipartDigestMessage()
+ out.attach(msg)
+ copy_headers(msg, out)
+ return out
+
+
+@public
def get_email(msg):
display_name, email = parseaddr(msg['from'])
# Address could be None or the empty string.
diff --git a/src/mailman_pgp/utils/pgp.py b/src/mailman_pgp/utils/pgp.py
index 4251693..a8f06f2 100644
--- a/src/mailman_pgp/utils/pgp.py
+++ b/src/mailman_pgp/utils/pgp.py
@@ -16,7 +16,11 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
"""Miscellaneous PGP utilities."""
-from pgpy import PGPKey
+from pgpy import PGPKey, PGPSignature
+from pgpy.constants import SignatureType
+from pgpy.errors import PGPError
+from pgpy.packet import Packet, Signature
+from pgpy.types import Armorable
from public import public
@@ -72,3 +76,72 @@ def key_from_file(file):
"""
key, _ = PGPKey.from_file(file)
return key
+
+
+@public
+def revoc_from_blob(blob):
+ """
+ Load a key revocation signature from an ASCII-Armored blob.
+
+ :param blob:
+ :return:
+ :rtype: pgpy.PGPSignature
+ """
+ dearm = Armorable.ascii_unarmor(blob)
+ p = Packet(dearm['body'])
+
+ if not isinstance(p, Signature):
+ raise ValueError('Not a key revocation signature.')
+ if p.sigtype not in (SignatureType.KeyRevocation,
+ SignatureType.SubkeyRevocation):
+ raise ValueError('Not a key revocation.')
+
+ sig = PGPSignature()
+ sig |= p
+ return sig
+
+
+@public
+def key_usable(key, flags_required):
+ """
+ Check that the `key` has the `flags_required` set of KeyFlags.
+
+ Checks only non-expired, non-revoked key/subkeys. Validates revocations it
+ can, so not those made with some other designated revocation key.
+
+ :param key: The key to check.
+ :type key: pgpy.PGPKey
+ :param flags_required: The set of flags required.
+ :type flags_required: set
+ :return: Whether the key has the flags_required.
+ :rtype: bool
+ """
+ if key.is_expired:
+ return False
+ for revoc in key.revocation_signatures:
+ try:
+ verified = key.verify(key, revoc)
+ except PGPError:
+ continue
+ if bool(verified):
+ return False
+
+ usage_flags = key.usage_flags()
+ for subkey in key.subkeys.values():
+ if subkey.is_expired:
+ continue
+
+ valid = True
+ for revoc in subkey.revocation_signatures:
+ try:
+ verified = key.verify(subkey, revoc)
+ except PGPError:
+ continue
+ if bool(verified):
+ valid = False
+ break
+
+ if valid:
+ usage_flags |= subkey.usage_flags()
+
+ return flags_required.issubset(usage_flags)
diff --git a/src/mailman_pgp/utils/tests/__init__.py b/src/mailman_pgp/utils/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/mailman_pgp/utils/tests/__init__.py
diff --git a/src/mailman_pgp/utils/tests/test_pgp.py b/src/mailman_pgp/utils/tests/test_pgp.py
new file mode 100644
index 0000000..b6433d4
--- /dev/null
+++ b/src/mailman_pgp/utils/tests/test_pgp.py
@@ -0,0 +1,86 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+import datetime
+import time
+from unittest import TestCase
+
+from parameterized import parameterized
+from pgpy import PGPKey, PGPUID
+from pgpy.constants import (
+ CompressionAlgorithm, EllipticCurveOID, HashAlgorithm, KeyFlags,
+ PubKeyAlgorithm, SymmetricKeyAlgorithm)
+
+from mailman_pgp.testing.layers import PGPLayer
+from mailman_pgp.testing.pgp import load_blob, load_key
+from mailman_pgp.utils.pgp import key_usable, revoc_from_blob
+
+
+class TestPGPUtils(TestCase):
+ layer = PGPLayer
+
+ @parameterized.expand([
+ (load_blob('revocs', 'rsa_1024.revoc.asc'),
+ load_key('rsa_1024.pub.asc')),
+ (load_blob('revocs', 'ecc_secp256k1.revoc.asc'),
+ load_key('ecc_secp256k1.pub.asc')),
+ (load_blob('revocs', 'ecc_p256.revoc.asc'),
+ load_key('ecc_p256.pub.asc'))
+ ])
+ def test_revoc_from_blob_valid(self, blob, key):
+ revoc = revoc_from_blob(blob)
+ verifies = key.verify(key, revoc)
+ self.assertTrue(bool(verifies))
+
+ @parameterized.expand([
+ ('Not an ASCII-Armored blob',),
+ (load_blob('keys', 'rsa_1024.pub.asc'),),
+ ])
+ def test_revoc_from_blob_invalid(self, blob):
+ self.assertRaises(ValueError, revoc_from_blob, blob)
+
+ def test_key_usable_expired(self):
+ key = PGPKey.new(PubKeyAlgorithm.ECDSA, EllipticCurveOID.SECP256K1)
+ uid = PGPUID.new('Some Name', email='anne@example.org')
+ key.add_uid(uid, key_expiration=datetime.timedelta(seconds=1),
+ usage={KeyFlags.Certify,
+ KeyFlags.Authentication,
+ KeyFlags.Sign},
+ hashes=[HashAlgorithm.SHA256,
+ HashAlgorithm.SHA512],
+ ciphers=[SymmetricKeyAlgorithm.AES256],
+ compression=[CompressionAlgorithm.ZLIB])
+
+ time.sleep(2)
+
+ self.assertFalse(key_usable(key, set()))
+
+ def test_key_usable_revoked(self):
+ key = load_key('ecc_p256.priv.asc')
+ rsig = key.revoke(key)
+ key |= rsig
+
+ self.assertFalse(key_usable(key, set()))
+
+ def test_key_usable_subkey_revoked(self):
+ key = load_key('ecc_p256.priv.asc')
+ sub = next(iter(key.subkeys.values()))
+ rsig = key.revoke(sub)
+ sub |= rsig
+
+ self.assertFalse(key_usable(key, {KeyFlags.EncryptCommunications}))
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index 036dc18..9fc5d58 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -16,15 +16,75 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
""""""
+from datetime import timedelta
+
+from mailman.interfaces.pending import IPendings
+from mailman.interfaces.subscriptions import TokenOwner
+from mailman.utilities.datetime import now
+from mailman.utilities.modules import abstract_component
+from mailman.workflows.common import SubscriptionBase
+from public import public
+from zope.component import getUtility
from mailman_pgp.database import transaction
from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+@public
class PGPMixin:
- def _step_pgp_prepare(self):
- pgp_address = PGPAddress.for_address(self.address)
- if pgp_address is None:
+ def __init__(self, mlist, pgp_address=None):
+ self.mlist = mlist
+ self.pgp_list = PGPMailingList.for_list(mlist)
+ self.pgp_address = pgp_address
+ if self.pgp_address is not None:
+ self.address = self.pgp_address.address
+
+ @property
+ def address_key(self):
+ return self.pgp_address.email
+
+ @address_key.setter
+ def address_key(self, value):
+ self.pgp_address = PGPAddress.for_email(value)
+ self.member = self.mlist.regular_members.get_member(value)
+
+ def _step_create_address(self):
+ self.pgp_address = PGPAddress.for_address(self.address)
+ if self.pgp_address is None:
with transaction() as t:
- pgp_address = PGPAddress(self.address)
- t.add(pgp_address)
+ self.pgp_address = PGPAddress(self.address)
+ t.add(self.pgp_address)
+
+ def _step_restore_address(self):
+ self.pgp_address = PGPAddress.for_address(self.address)
+
+ def _set_token(self, token_owner):
+ assert isinstance(token_owner, TokenOwner)
+ pendings = getUtility(IPendings)
+ if self.token is not None:
+ pendings.confirm(self.token)
+ self.token_owner = token_owner
+ if token_owner is TokenOwner.no_one:
+ self.token = None
+ return
+
+ pendable = self.pendable_class()(
+ list_id=self.mlist.list_id,
+ email=self.address.email,
+ display_name=self.address.display_name,
+ when=now().replace(microsecond=0).isoformat(),
+ token_owner=token_owner.name,
+ )
+ self.token = pendings.add(pendable, timedelta(days=3650))
+
+
+@public
+@abstract_component
+class PGPSubscriptionBase(SubscriptionBase, PGPMixin):
+ def __init__(self, mlist, subscriber=None, *, pgp_address=None):
+ SubscriptionBase.__init__(self, mlist, subscriber)
+ PGPMixin.__init__(self, mlist, pgp_address=pgp_address)
+
+ def _step_restore_subscriber(self):
+ self._restore_subscriber()
diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py
index 290e504..1d07903 100644
--- a/src/mailman_pgp/workflows/key_change.py
+++ b/src/mailman_pgp/workflows/key_change.py
@@ -28,10 +28,11 @@ from zope.interface import implementer
from mailman_pgp.config import config
from mailman_pgp.database import transaction
-from mailman_pgp.model.address import PGPAddress
-from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.utils.email import copy_headers
+from mailman_pgp.workflows.base import PGPMixin
+from mailman_pgp.workflows.mod_approval import (
+ ModeratorKeyChangeApprovalMixin)
CHANGE_CONFIRM_REQUEST = """\
----------
@@ -46,34 +47,18 @@ Token: {}
"""
-@public
-@implementer(IWorkflow)
-class KeyChangeWorkflow(Workflow):
- name = 'pgp-key-change-workflow'
- description = ''
- initial_state = 'change_key'
+class KeyChangeBase(Workflow, PGPMixin):
save_attributes = (
'address_key',
- 'pubkey_key'
+ 'pubkey_key',
)
def __init__(self, mlist, pgp_address=None, pubkey=None):
- super().__init__()
- self.mlist = mlist
- self.pgp_list = PGPMailingList.for_list(mlist)
- self.pgp_address = pgp_address
+ Workflow.__init__(self)
+ PGPMixin.__init__(self, mlist, pgp_address)
self.pubkey = pubkey
@property
- def address_key(self):
- return self.pgp_address.email
-
- @address_key.setter
- def address_key(self, value):
- self.pgp_address = PGPAddress.for_email(value)
- self.member = self.mlist.regular_members.get_member(value)
-
- @property
def pubkey_key(self):
return str(self.pubkey)
@@ -81,23 +66,27 @@ class KeyChangeWorkflow(Workflow):
def pubkey_key(self, value):
self.pubkey, _ = PGPKey.from_blob(value)
- def _step_change_key(self):
- if self.pgp_address is None or self.pubkey is None:
- raise ValueError
-
- self.push('send_key_confirm_request')
-
- def _step_send_key_confirm_request(self):
+ def _pend(self, token_owner, lifetime=None):
pendings = getUtility(IPendings)
- pendable = KeyChangeWorkflow.pendable_class()(
+ pendable = self.pendable_class()(
email=self.pgp_address.email,
pubkey=str(self.pubkey),
fingerprint=self.pubkey.fingerprint
)
- lifetime = config.get_value('misc', 'change_request_lifetime')
+
self.token = pendings.add(pendable, lifetime=lifetime)
- self.token_owner = TokenOwner.subscriber
+ self.token_owner = token_owner
+
+ def _step_change_key(self):
+ if self.pgp_address is None or self.pubkey is None:
+ raise ValueError
+
+ self.push('send_key_confirm_request')
+ def _step_send_key_confirm_request(self):
+ self._pend(TokenOwner.subscriber,
+ lifetime=config.get_value('misc',
+ 'change_request_lifetime'))
self.push('receive_confirmation')
self.save()
request_address = self.mlist.request_address
@@ -116,20 +105,52 @@ class KeyChangeWorkflow(Workflow):
raise StopIteration
def _step_receive_confirmation(self):
+ self._set_token(TokenOwner.no_one)
+
+ def _step_do_change(self):
with transaction():
self.pgp_address.key = self.pubkey
self.pgp_address.key_confirmed = True
- pendings = getUtility(IPendings)
- if self.token is not None:
- pendings.confirm(self.token)
- self.token = None
- self.token_owner = TokenOwner.no_one
-
@classmethod
def pendable_class(cls):
@implementer(IPendable)
class Pendable(dict):
- PEND_TYPE = KeyChangeWorkflow.name
+ PEND_TYPE = cls.name
return Pendable
+
+
+@public
+@implementer(IWorkflow)
+class KeyChangeWorkflow(KeyChangeBase):
+ name = 'pgp-key-change-workflow'
+ description = ''
+ initial_state = 'prepare'
+
+ def _step_prepare(self):
+ self.push('do_change')
+ self.push('change_key')
+
+
+@public
+@implementer(IWorkflow)
+class KeyChangeModWorkflow(KeyChangeBase, ModeratorKeyChangeApprovalMixin):
+ name = 'pgp-key-change-mod-workflow'
+ description = ''
+ initial_state = 'prepare'
+ save_attributes = (
+ 'approved',
+ 'address_key',
+ 'pubkey_key'
+ )
+
+ def __init__(self, mlist, pgp_address=None, pubkey=None,
+ pre_approved=False):
+ KeyChangeBase.__init__(self, mlist, pgp_address, pubkey)
+ ModeratorKeyChangeApprovalMixin.__init__(self, pre_approved)
+
+ def _step_prepare(self):
+ self.push('do_change')
+ self.push('mod_approval')
+ self.push('change_key')
diff --git a/src/mailman_pgp/workflows/key_confirm.py b/src/mailman_pgp/workflows/key_confirm.py
new file mode 100644
index 0000000..0a38551
--- /dev/null
+++ b/src/mailman_pgp/workflows/key_confirm.py
@@ -0,0 +1,77 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+from mailman.email.message import UserNotification
+from mailman.interfaces.subscriptions import TokenOwner
+from public import public
+
+from mailman_pgp.database import transaction
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.wrapper import PGPWrapper
+from mailman_pgp.utils.email import copy_headers
+
+CONFIRM_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+Reply to this message with this whole text
+signed with your supplied key, either inline or PGP/MIME.
+
+Fingerprint: {}
+Token: {}
+----------
+"""
+
+
+@public
+class ConfirmPubkeyMixin:
+ def __init__(self, pre_confirmed=False):
+ self.pubkey_confirmed = pre_confirmed
+
+ def _step_pubkey_confirmation(self):
+ assert self.pgp_address is not None
+
+ if self.pubkey_confirmed:
+ with transaction():
+ self.pgp_address.key_confirmed = True
+ else:
+ if not self.pgp_address.key_confirmed:
+ self.push('send_key_confirm_request')
+
+ def _step_send_key_confirm_request(self):
+ self._set_token(TokenOwner.subscriber)
+ self.push('receive_key_confirmation')
+ self.save()
+
+ request_address = self.mlist.request_address
+ email_address = self.address.email
+ msg = UserNotification(email_address, request_address,
+ 'key confirm {}'.format(self.token),
+ CONFIRM_REQUEST.format(
+ self.pgp_address.key_fingerprint,
+ self.token))
+ pgp_list = PGPMailingList.for_list(self.mlist)
+ wrapped = PGPWrapper(msg)
+ encrypted = wrapped.sign_encrypt(pgp_list.key, self.pgp_address.key)
+
+ msg.set_payload(encrypted.get_payload())
+ copy_headers(encrypted, msg, True)
+ msg.send(self.mlist)
+ raise StopIteration
+
+ def _step_receive_key_confirmation(self):
+ self._set_token(TokenOwner.no_one)
diff --git a/src/mailman_pgp/workflows/key_revoke.py b/src/mailman_pgp/workflows/key_revoke.py
new file mode 100644
index 0000000..7a7c071
--- /dev/null
+++ b/src/mailman_pgp/workflows/key_revoke.py
@@ -0,0 +1,70 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+from mailman.interfaces.pending import IPendable
+from mailman.interfaces.workflows import IWorkflow
+from mailman.workflows.base import Workflow
+from public import public
+from zope.interface import implementer
+
+from mailman_pgp.workflows.base import PGPMixin
+from mailman_pgp.workflows.key_confirm import ConfirmPubkeyMixin
+from mailman_pgp.workflows.key_set import SetPubkeyMixin
+from mailman_pgp.workflows.mod_approval import ModeratorKeyRevokeApprovalMixin
+
+
+class KeyRevokeBase(Workflow, PGPMixin):
+ def __init__(self, mlist, pgp_address=None):
+ Workflow.__init__(self)
+ PGPMixin.__init__(self, mlist, pgp_address)
+
+ @classmethod
+ def pendable_class(cls):
+ @implementer(IPendable)
+ class Pendable(dict):
+ PEND_TYPE = cls.name
+
+ return Pendable
+
+
+@public
+@implementer(IWorkflow)
+class KeyRevokeWorkflow(KeyRevokeBase, SetPubkeyMixin, ConfirmPubkeyMixin,
+ ModeratorKeyRevokeApprovalMixin):
+ name = 'pgp-key-revoke-workflow'
+ description = ''
+ initial_state = 'prepare'
+ save_attributes = (
+ 'approved',
+ 'address_key',
+ 'pubkey_key',
+ 'pubkey_confirmed'
+ )
+
+ def __init__(self, mlist, pgp_address=None, pubkey=None,
+ pubkey_pre_confirmed=False, pre_approved=False):
+ KeyRevokeBase.__init__(self, mlist, pgp_address=pgp_address)
+ SetPubkeyMixin.__init__(self, pubkey=pubkey)
+ ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
+ ModeratorKeyRevokeApprovalMixin.__init__(self,
+ pre_approved=pre_approved)
+
+ def _step_prepare(self):
+ self.push('mod_approval')
+ self.push('pubkey_confirmation')
+ self.push('pubkey_checks')
diff --git a/src/mailman_pgp/workflows/key_set.py b/src/mailman_pgp/workflows/key_set.py
new file mode 100644
index 0000000..95e6e7c
--- /dev/null
+++ b/src/mailman_pgp/workflows/key_set.py
@@ -0,0 +1,76 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+from mailman.email.message import UserNotification
+from mailman.interfaces.subscriptions import TokenOwner
+from pgpy import PGPKey
+from public import public
+
+from mailman_pgp.database import transaction
+
+KEY_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+We need your pubkey.
+Reply to this message with it as a PGP/MIME(preferred) or inline.
+----------"""
+
+
+@public
+class SetPubkeyMixin:
+ def __init__(self, pubkey=None):
+ self.pubkey = pubkey
+
+ @property
+ def pubkey_key(self):
+ if self.pubkey is None:
+ return None
+ return str(self.pubkey)
+
+ @pubkey_key.setter
+ def pubkey_key(self, value):
+ if value is not None:
+ self.pubkey, _ = PGPKey.from_blob(value)
+ else:
+ self.pubkey = None
+
+ def _step_pubkey_checks(self):
+ assert self.pgp_address is not None
+
+ if self.pubkey is None:
+ if self.pgp_address.key is None:
+ self.push('send_key_request')
+ else:
+ with transaction():
+ self.pgp_address.key = self.pubkey
+
+ def _step_send_key_request(self):
+ self._set_token(TokenOwner.subscriber)
+ self.push('receive_key')
+ self.save()
+ request_address = self.mlist.request_address
+ email_address = self.address.email
+ msg = UserNotification(email_address, request_address,
+ 'key set {}'.format(self.token),
+ KEY_REQUEST)
+ msg.send(self.mlist, add_precedence=False)
+ # Now we wait for the confirmation.
+ raise StopIteration
+
+ def _step_receive_key(self):
+ self._set_token(TokenOwner.no_one)
diff --git a/src/mailman_pgp/workflows/mod_approval.py b/src/mailman_pgp/workflows/mod_approval.py
new file mode 100644
index 0000000..367f773
--- /dev/null
+++ b/src/mailman_pgp/workflows/mod_approval.py
@@ -0,0 +1,153 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+import copy
+
+from mailman.email.message import UserNotification
+from mailman.interfaces.pending import IPendings
+from mailman.interfaces.subscriptions import TokenOwner
+from public import public
+from zope.component import getUtility
+
+from mailman_pgp.pgp.mime import MIMEWrapper
+from mailman_pgp.utils.email import overwrite_message
+
+SUBSCRIPTION_MOD_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+A user with address {address} requested subscription.
+The key is attached to this message.
+
+Fingerprint: {fingerprint}
+----------
+"""
+
+KEY_CHANGE_MOD_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+A subscriber with address {address} requested a change of his key.
+The new key is attached to this message.
+
+Old key fingerprint: {old_fpr}
+New key fingerprint: {new_fpr}
+----------
+"""
+
+KEY_REVOKE_MOD_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+A subscriber with address {address} revoked a part of his key,
+which made it unusable and needs to be reset. The subscriber
+supplied a new key. The new key is attached to this message.
+
+Old key fingerprint: {old_fpr}
+New key fingerprint: {new_fpr}
+----------
+"""
+
+
+class ModeratorApprovalMixin:
+ def __init__(self, pre_approved=False):
+ self.approved = pre_approved
+
+ def _step_mod_approval(self):
+ if not self.approved:
+ self.push('get_approval')
+
+ def _step_get_approval(self):
+ self._pend(TokenOwner.moderator)
+ self.push('receive_mod_confirmation')
+ self.save()
+
+ name = self._request_name
+ body = self._request_body
+
+ if self.mlist.admin_immed_notify:
+ subject = 'New {} request from {}'.format(name,
+ self.pgp_address.email)
+ msg = UserNotification(
+ self.mlist.owner_address, self.mlist.owner_address,
+ subject, body, self.mlist.preferred_language)
+ out = copy.deepcopy(msg)
+ wrapped = MIMEWrapper(msg)
+ msg = wrapped.attach_keys(self.pubkey)
+ overwrite_message(msg, out)
+ out.send(self.mlist)
+ raise StopIteration
+
+ def _step_receive_mod_confirmation(self):
+ pendings = getUtility(IPendings)
+ if self.token is not None:
+ pendings.confirm(self.token)
+ self.token = None
+ self.token_owner = TokenOwner.no_one
+
+
+@public
+class ModeratorSubApprovalMixin(ModeratorApprovalMixin):
+ def __init__(self, pre_approved=False):
+ super().__init__(pre_approved)
+
+ @property
+ def _request_name(self):
+ return 'subscription'
+
+ @property
+ def _request_body(self):
+ params = {'mlist': self.mlist.fqdn_listname,
+ 'address': self.pgp_address.email,
+ 'fingerprint': self.pubkey.fingerprint}
+ return SUBSCRIPTION_MOD_REQUEST.format(**params)
+
+
+@public
+class ModeratorKeyChangeApprovalMixin(ModeratorApprovalMixin):
+ def __init__(self, pre_approved=False):
+ super().__init__(pre_approved)
+
+ @property
+ def _request_name(self):
+ return 'key change'
+
+ @property
+ def _request_body(self):
+ params = {'mlist': self.mlist.fqdn_listname,
+ 'address': self.pgp_address.email,
+ 'fingerprint': self.pubkey.fingerprint,
+ 'old_fpr': self.pgp_address.key_fingerprint,
+ 'new_fpr': self.pubkey.fingerprint}
+ return KEY_CHANGE_MOD_REQUEST.format(**params)
+
+
+@public
+class ModeratorKeyRevokeApprovalMixin(ModeratorApprovalMixin):
+ def __init__(self, pre_approved=False):
+ super().__init__(pre_approved)
+
+ @property
+ def _request_name(self):
+ return 'key reset'
+
+ @property
+ def _request_body(self):
+ params = {'mlist': self.mlist.fqdn_listname,
+ 'address': self.pgp_address.email,
+ 'fingerprint': self.pubkey.fingerprint,
+ 'old_fpr': self.pgp_address.key_fingerprint,
+ 'new_fpr': self.pubkey.fingerprint}
+ return KEY_REVOKE_MOD_REQUEST.format(**params)
diff --git a/src/mailman_pgp/workflows/pubkey.py b/src/mailman_pgp/workflows/pubkey.py
deleted file mode 100644
index a13d491..0000000
--- a/src/mailman_pgp/workflows/pubkey.py
+++ /dev/null
@@ -1,118 +0,0 @@
-from mailman.email.message import UserNotification
-from mailman.interfaces.subscriptions import TokenOwner
-from pgpy import PGPKey
-
-from mailman_pgp.database import transaction
-from mailman_pgp.model.address import PGPAddress
-from mailman_pgp.model.list import PGPMailingList
-from mailman_pgp.pgp.wrapper import PGPWrapper
-from mailman_pgp.utils.email import copy_headers
-
-KEY_REQUEST = """\
-----------
-TODO: this is a pgp enabled list.
-We need your pubkey.
-Reply to this message with it as a PGP/MIME(preferred) or inline.
-----------"""
-
-CONFIRM_REQUEST = """\
-----------
-TODO: this is a pgp enabled list.
-Reply to this message with this whole text
-signed with your supplied key, either inline or PGP/MIME.
-
-Fingerprint: {}
-Token: {}
-----------
-"""
-
-
-class SetPubkeyMixin:
- def __init__(self, pubkey=None):
- self.pubkey = pubkey
-
- @property
- def pubkey_key(self):
- if self.pubkey is None:
- return None
- return str(self.pubkey)
-
- @pubkey_key.setter
- def pubkey_key(self, value):
- if value is not None:
- self.pubkey, _ = PGPKey.from_blob(value)
- else:
- self.pubkey = None
-
- def _step_pubkey_checks(self):
- pgp_address = PGPAddress.for_address(self.address)
- assert pgp_address is not None
-
- if self.pubkey is None:
- if pgp_address.key is None:
- self.push('send_key_request')
- else:
- with transaction():
- pgp_address.key = self.pubkey
-
- def _step_send_key_request(self):
- self._set_token(TokenOwner.subscriber)
- self.push('receive_key')
- self.save()
- request_address = self.mlist.request_address
- email_address = self.address.email
- msg = UserNotification(email_address, request_address,
- 'key set {}'.format(self.token),
- KEY_REQUEST)
- msg.send(self.mlist, add_precedence=False)
- # Now we wait for the confirmation.
- raise StopIteration
-
- def _step_receive_key(self):
- self._restore_subscriber()
- self._set_token(TokenOwner.no_one)
-
-
-class ConfirmPubkeyMixin:
- def __init__(self, pre_confirmed=False):
- self.pubkey_confirmed = pre_confirmed
-
- def _step_pubkey_confirmation(self):
- pgp_address = PGPAddress.for_address(self.address)
- assert pgp_address is not None
-
- if self.pubkey_confirmed:
- with transaction():
- pgp_address.key_confirmed = True
- else:
- if not pgp_address.key_confirmed:
- self.push('send_key_confirm_request')
-
- def _step_send_key_confirm_request(self):
- self._set_token(TokenOwner.subscriber)
- self.push('receive_key_confirmation')
- self.save()
-
- pgp_address = PGPAddress.for_address(self.address)
- request_address = self.mlist.request_address
- email_address = self.address.email
- msg = UserNotification(email_address, request_address,
- 'key confirm {}'.format(self.token),
- CONFIRM_REQUEST.format(
- pgp_address.key_fingerprint,
- self.token))
- pgp_list = PGPMailingList.for_list(self.mlist)
- wrapped = PGPWrapper(msg)
- encrypted = wrapped.sign_encrypt(pgp_list.key, pgp_address.key)
-
- msg.set_payload(encrypted.get_payload())
- copy_headers(encrypted, msg, True)
- msg.send(self.mlist)
- raise StopIteration
-
- def _step_receive_key_confirmation(self):
- self._restore_subscriber()
- self._set_token(TokenOwner.no_one)
- with transaction():
- pgp_address = PGPAddress.for_address(self.address)
- pgp_address.key_confirmed = True
diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py
index 809b7cb..2546f1e 100644
--- a/src/mailman_pgp/workflows/subscription.py
+++ b/src/mailman_pgp/workflows/subscription.py
@@ -19,18 +19,19 @@
from mailman.core.i18n import _
from mailman.interfaces.workflows import ISubscriptionWorkflow
-from mailman.workflows.common import (ConfirmationMixin, ModerationMixin,
- SubscriptionBase, VerificationMixin)
+from mailman.workflows.common import (ConfirmationMixin, VerificationMixin)
from public import public
from zope.interface import implementer
-from mailman_pgp.workflows.base import PGPMixin
-from mailman_pgp.workflows.pubkey import ConfirmPubkeyMixin, SetPubkeyMixin
+from mailman_pgp.workflows.base import PGPMixin, PGPSubscriptionBase
+from mailman_pgp.workflows.key_confirm import ConfirmPubkeyMixin
+from mailman_pgp.workflows.key_set import SetPubkeyMixin
+from mailman_pgp.workflows.mod_approval import ModeratorSubApprovalMixin
@public
@implementer(ISubscriptionWorkflow)
-class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin,
+class OpenSubscriptionPolicy(PGPSubscriptionBase, VerificationMixin,
SetPubkeyMixin, ConfirmPubkeyMixin,
PGPMixin):
""""""
@@ -52,26 +53,28 @@ class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin,
def __init__(self, mlist, subscriber=None, *,
pre_verified=False, pubkey=None,
pubkey_pre_confirmed=False):
- SubscriptionBase.__init__(self, mlist, subscriber)
+ PGPSubscriptionBase.__init__(self, mlist, subscriber)
VerificationMixin.__init__(self, pre_verified=pre_verified)
SetPubkeyMixin.__init__(self, pubkey=pubkey)
ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
- PGPMixin.__init__(self)
+ PGPMixin.__init__(self, mlist)
def _step_prepare(self):
self.push('do_subscription')
+ self.push('restore_subscriber')
self.push('pubkey_confirmation')
+ self.push('restore_address')
self.push('pubkey_checks')
- self.push('pgp_prepare')
+ self.push('create_address')
self.push('verification_checks')
self.push('sanity_checks')
@public
@implementer(ISubscriptionWorkflow)
-class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
+class ConfirmSubscriptionPolicy(PGPSubscriptionBase, VerificationMixin,
ConfirmationMixin, SetPubkeyMixin,
- ConfirmPubkeyMixin, PGPMixin):
+ ConfirmPubkeyMixin):
""""""
name = 'pgp-policy-confirm'
@@ -92,18 +95,19 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
def __init__(self, mlist, subscriber=None, *,
pre_verified=False, pre_confirmed=False, pubkey=None,
pubkey_pre_confirmed=False):
- SubscriptionBase.__init__(self, mlist, subscriber)
+ PGPSubscriptionBase.__init__(self, mlist, subscriber)
VerificationMixin.__init__(self, pre_verified=pre_verified)
ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed)
SetPubkeyMixin.__init__(self, pubkey=pubkey)
ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
- PGPMixin.__init__(self)
def _step_prepare(self):
self.push('do_subscription')
+ self.push('restore_subscriber')
self.push('pubkey_confirmation')
+ self.push('restore_address')
self.push('pubkey_checks')
- self.push('pgp_prepare')
+ self.push('create_address')
self.push('confirmation_checks')
self.push('verification_checks')
self.push('sanity_checks')
@@ -111,9 +115,9 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
@public
@implementer(ISubscriptionWorkflow)
-class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
- ModerationMixin, SetPubkeyMixin,
- ConfirmPubkeyMixin, PGPMixin):
+class ModerationSubscriptionPolicy(PGPSubscriptionBase, VerificationMixin,
+ ModeratorSubApprovalMixin, SetPubkeyMixin,
+ ConfirmPubkeyMixin):
""""""
name = 'pgp-policy-moderate'
@@ -134,29 +138,31 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
def __init__(self, mlist, subscriber=None, *,
pre_verified=False, pre_approved=False, pubkey=None,
pubkey_pre_confirmed=False):
- SubscriptionBase.__init__(self, mlist, subscriber)
+ PGPSubscriptionBase.__init__(self, mlist, subscriber)
VerificationMixin.__init__(self, pre_verified=pre_verified)
- ModerationMixin.__init__(self, pre_approved=pre_approved)
+ ModeratorSubApprovalMixin.__init__(self, pre_approved=pre_approved)
SetPubkeyMixin.__init__(self, pubkey=pubkey)
ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
- PGPMixin.__init__(self)
def _step_prepare(self):
self.push('do_subscription')
- self.push('moderation_checks')
+ self.push('restore_subscriber')
+ self.push('mod_approval')
self.push('pubkey_confirmation')
+ self.push('restore_address')
self.push('pubkey_checks')
- self.push('pgp_prepare')
+ self.push('create_address')
self.push('verification_checks')
self.push('sanity_checks')
@public
@implementer(ISubscriptionWorkflow)
-class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
- ConfirmationMixin, ModerationMixin,
- SetPubkeyMixin, ConfirmPubkeyMixin,
- PGPMixin):
+class ConfirmModerationSubscriptionPolicy(PGPSubscriptionBase,
+ VerificationMixin,
+ ConfirmationMixin,
+ ModeratorSubApprovalMixin,
+ SetPubkeyMixin, ConfirmPubkeyMixin):
""""""
name = 'pgp-policy-confirm-moderate'
@@ -178,20 +184,22 @@ class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
def __init__(self, mlist, subscriber=None, *,
pre_verified=False, pre_confirmed=False, pre_approved=False,
pubkey=None, pubkey_pre_confirmed=False):
- SubscriptionBase.__init__(self, mlist, subscriber)
+ PGPSubscriptionBase.__init__(self, mlist, subscriber)
VerificationMixin.__init__(self, pre_verified=pre_verified)
ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed)
- ModerationMixin.__init__(self, pre_approved=pre_approved)
+ ModeratorSubApprovalMixin.__init__(self, pre_approved=pre_approved)
SetPubkeyMixin.__init__(self, pubkey=pubkey)
ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
- PGPMixin.__init__(self)
def _step_prepare(self):
self.push('do_subscription')
- self.push('moderation_checks')
+ self.push('restore_subscriber')
+ self.push('mod_approval')
+ self.push('restore_address')
self.push('pubkey_confirmation')
+ self.push('restore_address')
self.push('pubkey_checks')
- self.push('pgp_prepare')
+ self.push('create_address')
self.push('confirmation_checks')
self.push('verification_checks')
self.push('sanity_checks')
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
index 31b3d05..3904105 100644
--- a/src/mailman_pgp/workflows/tests/test_base.py
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -37,8 +37,8 @@ from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.testing.layers import PGPConfigLayer
from mailman_pgp.testing.pgp import load_key
from mailman_pgp.workflows.base import (PGPMixin)
-from mailman_pgp.workflows.pubkey import (ConfirmPubkeyMixin, KEY_REQUEST,
- SetPubkeyMixin)
+from mailman_pgp.workflows.key_confirm import ConfirmPubkeyMixin
+from mailman_pgp.workflows.key_set import KEY_REQUEST, SetPubkeyMixin
class PubkeyMixinTestSetup():
@@ -49,13 +49,14 @@ class PubkeyMixinTestSetup():
self.list_key = load_key('ecc_p256.priv.asc')
- self.pgp_list = PGPMailingList.for_list(self.mlist)
- self.pgp_list.key = self.list_key
+ with transaction():
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = self.list_key
self.um = getUtility(IUserManager)
self.sender_key = load_key('rsa_1024.priv.asc')
- self.sender = self.um.create_address('rsa-1024b@example.org')
+ self.sender = self.um.create_address('anne@example.org')
@implementer(IWorkflow)
@@ -78,13 +79,13 @@ class PGPTestWorkflow(SubscriptionBase, PGPMixin, SetPubkeyMixin,
SubscriptionBase.__init__(self, mlist, subscriber)
SetPubkeyMixin.__init__(self, pubkey=pubkey)
ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
- PGPMixin.__init__(self)
+ PGPMixin.__init__(self, mlist)
def _step_prepare(self):
self.push('do_subscription')
self.push('pubkey_confirmation')
self.push('pubkey_checks')
- self.push('pgp_prepare')
+ self.push('create_address')
self.push('sanity_checks')
@@ -93,7 +94,7 @@ class TestPGPMixin(PubkeyMixinTestSetup, unittest.TestCase):
def test_create_address(self):
workflow = PGPTestWorkflow(self.mlist, self.sender)
- workflow.run_thru('pgp_prepare')
+ workflow.run_thru('create_address')
pgp_address = PGPAddress.for_address(self.sender)
self.assertIsNotNone(pgp_address)
@@ -102,7 +103,7 @@ class TestPGPMixin(PubkeyMixinTestSetup, unittest.TestCase):
with transaction() as t:
pgp_address = PGPAddress(self.sender)
t.add(pgp_address)
- workflow.run_thru('pgp_prepare')
+ workflow.run_thru('create_address')
still = PGPAddress.for_address(self.sender)
self.assertIsNotNone(still)
@@ -203,7 +204,7 @@ class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase):
self.assertIsNotNone(workflow.token)
pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
self.assertEqual(pendable['list_id'], 'test.example.com')
- self.assertEqual(pendable['email'], 'rsa-1024b@example.org')
+ self.assertEqual(pendable['email'], 'anne@example.org')
self.assertEqual(pendable['display_name'], '')
self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
self.assertEqual(pendable['token_owner'], 'subscriber')
@@ -216,7 +217,7 @@ class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase):
self.assertIsNotNone(workflow.token)
pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
self.assertEqual(pendable['list_id'], 'test.example.com')
- self.assertEqual(pendable['email'], 'rsa-1024b@example.org')
+ self.assertEqual(pendable['email'], 'anne@example.org')
self.assertEqual(pendable['display_name'], '')
self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
self.assertEqual(pendable['token_owner'], 'subscriber')
diff --git a/src/mailman_pgp/workflows/tests/test_key_change.py b/src/mailman_pgp/workflows/tests/test_key_change.py
index e469d51..5d4926a 100644
--- a/src/mailman_pgp/workflows/tests/test_key_change.py
+++ b/src/mailman_pgp/workflows/tests/test_key_change.py
@@ -25,13 +25,15 @@ from mailman.interfaces.usermanager import IUserManager
from mailman.testing.helpers import get_queue_messages
from zope.component import getUtility
+from mailman_pgp.config import mm_config
from mailman_pgp.database import mm_transaction, transaction
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.testing.layers import PGPConfigLayer
from mailman_pgp.testing.pgp import load_key
-from mailman_pgp.workflows.key_change import KeyChangeWorkflow
+from mailman_pgp.workflows.key_change import (KeyChangeModWorkflow,
+ KeyChangeWorkflow)
class TestKeyChangeWorkflow(unittest.TestCase):
@@ -41,13 +43,18 @@ class TestKeyChangeWorkflow(unittest.TestCase):
with mm_transaction():
self.mlist = create_list('test@example.com',
style_name='pgp-default')
- self.pgp_list = PGPMailingList.for_list(self.mlist)
- self.pgp_list.key = load_key('ecc_p256.priv.asc')
+ with transaction():
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = load_key('ecc_p256.priv.asc')
self.sender_key = load_key('rsa_1024.priv.asc')
self.sender_new_key = load_key('ecc_p256.priv.asc')
self.sender = getUtility(IUserManager).create_address(
- 'rsa-1024b@example.org')
+ 'anne@example.org')
+
+ def test_has_workflows(self):
+ self.assertTrue(KeyChangeWorkflow.name, mm_config.workflows)
+ self.assertTrue(KeyChangeModWorkflow.name, mm_config.workflows)
def test_pgp_address_none(self):
workflow = KeyChangeWorkflow(self.mlist)
@@ -101,3 +108,19 @@ class TestKeyChangeWorkflow(unittest.TestCase):
self.assertEqual(pgp_address.key_fingerprint,
self.sender_new_key.fingerprint)
self.assertTrue(pgp_address.key_confirmed)
+
+ def test_confirm_mod(self):
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ workflow = KeyChangeModWorkflow(self.mlist, pgp_address,
+ self.sender_new_key.pubkey)
+ list(workflow)
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).confirm(
+ workflow.token)
+ self.assertIsNotNone(token)
+ self.assertEqual(token_owner, TokenOwner.moderator)
diff --git a/src/mailman_pgp/workflows/tests/test_mod_approval.py b/src/mailman_pgp/workflows/tests/test_mod_approval.py
new file mode 100644
index 0000000..7d57a9b
--- /dev/null
+++ b/src/mailman_pgp/workflows/tests/test_mod_approval.py
@@ -0,0 +1,106 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+from unittest import TestCase
+
+from mailman.app.lifecycle import create_list
+from mailman.interfaces.subscriptions import TokenOwner
+from mailman.interfaces.usermanager import IUserManager
+from mailman.interfaces.workflows import IWorkflow
+from mailman.testing.helpers import get_queue_messages
+from zope.component import getUtility
+from zope.interface import implementer
+
+from mailman_pgp.database import mm_transaction, transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.wrapper import PGPWrapper
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.testing.pgp import load_key
+from mailman_pgp.workflows.key_change import KeyChangeBase
+from mailman_pgp.workflows.mod_approval import (
+ ModeratorKeyChangeApprovalMixin)
+
+
+@implementer(IWorkflow)
+class PGPTestWorkflow(KeyChangeBase, ModeratorKeyChangeApprovalMixin):
+ name = 'test-workflow'
+ description = ''
+ initial_state = 'mod_approval'
+ save_attributes = (
+ 'approved',
+ )
+
+ def __init__(self, mlist, pgp_address=None, pubkey=None,
+ pre_approved=False):
+ KeyChangeBase.__init__(self, mlist, pgp_address, pubkey)
+ ModeratorKeyChangeApprovalMixin.__init__(self, pre_approved)
+
+
+class TestModeratorApprovalMixin(TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ with mm_transaction():
+ self.mlist = create_list('test@example.com',
+ style_name='pgp-default')
+ with transaction():
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = load_key('ecc_p256.priv.asc')
+
+ self.sender_key = load_key('rsa_1024.priv.asc')
+ self.sender_new_key = load_key('ecc_p256.priv.asc')
+ self.sender = getUtility(IUserManager).create_address(
+ 'anne@example.org')
+
+ def test_get_approval(self):
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ workflow = PGPTestWorkflow(self.mlist, pgp_address,
+ self.sender_new_key.pubkey)
+ list(workflow)
+ items = get_queue_messages('virgin', expected_count=1)
+ message = items[0].msg
+
+ self.assertEqual(message['Subject'],
+ 'New key change request from {}'.format(
+ pgp_address.email))
+ wrapped = PGPWrapper(message)
+ self.assertTrue(wrapped.has_keys())
+ keys = list(wrapped.keys())
+ self.assertEqual(len(keys), 1)
+ key = keys.pop()
+ self.assertEqual(key.fingerprint, self.sender_new_key.fingerprint)
+
+ def test_receive_approval(self):
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ workflow = PGPTestWorkflow(self.mlist, pgp_address,
+ self.sender_new_key.pubkey)
+ list(workflow)
+ get_queue_messages('virgin', expected_count=1)
+ list(workflow)
+ self.assertEqual(workflow.token_owner, TokenOwner.no_one)