diff options
| author | J08nY | 2017-08-07 01:30:15 +0200 |
|---|---|---|
| committer | J08nY | 2017-08-07 01:30:15 +0200 |
| commit | be8e21927d063ee5ddd5fc7376669164f9914ad0 (patch) | |
| tree | 590b3c7a582507869670635270ecdac876280176 /src/mailman_pgp | |
| parent | 21b504db4f63efc5d2fa58c646c82d5d8659eca1 (diff) | |
| parent | 59ec076d04340245101de98633705d312374d9fe (diff) | |
| download | mailman-pgp-be8e21927d063ee5ddd5fc7376669164f9914ad0.tar.gz mailman-pgp-be8e21927d063ee5ddd5fc7376669164f9914ad0.tar.zst mailman-pgp-be8e21927d063ee5ddd5fc7376669164f9914ad0.zip | |
Diffstat (limited to 'src/mailman_pgp')
33 files changed, 1647 insertions, 287 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index 0775bed..5799b8f 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -17,6 +17,7 @@ """The key email command.""" import logging +import copy from email.mime.text import MIMEText from mailman.email.message import UserNotification @@ -25,19 +26,24 @@ from mailman.interfaces.pending import IPendings from mailman.interfaces.subscriptions import ISubscriptionManager from mailman.interfaces.usermanager import IUserManager from pgpy.constants import KeyFlags +from pgpy.errors import PGPError from public import public from zope.component import getUtility from zope.interface import implementer +from mailman_pgp.config import mm_config from mailman_pgp.database import transaction from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.mime import MIMEWrapper from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.utils.email import get_email +from mailman_pgp.utils.pgp import key_usable from mailman_pgp.workflows.key_change import (CHANGE_CONFIRM_REQUEST, + KeyChangeModWorkflow, KeyChangeWorkflow) -from mailman_pgp.workflows.pubkey import CONFIRM_REQUEST +from mailman_pgp.workflows.key_confirm import CONFIRM_REQUEST +from mailman_pgp.workflows.key_revoke import KeyRevokeWorkflow log = logging.getLogger('mailman.plugin.pgp.commands') @@ -78,10 +84,7 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results): file=results) return ContinueProcessing.no - usage_flags = key.usage_flags() - for subkey in key.subkeys.values(): - usage_flags |= subkey.usage_flags() - if KeyFlags.EncryptCommunications not in usage_flags: + if not key_usable(key, {KeyFlags.EncryptCommunications, KeyFlags.Sign}): print('Need a key which can be used to encrypt communications.', file=results) return ContinueProcessing.no @@ -170,7 +173,8 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): print('Wrong token.', file=results) return ContinueProcessing.no - if pendable.get('type') == KeyChangeWorkflow.pendable_class().PEND_TYPE: + if pendable.get('type') in (KeyChangeWorkflow.name, + KeyChangeModWorkflow.name): expecting = CHANGE_CONFIRM_REQUEST.format(pendable.get('fingerprint'), token) else: @@ -178,6 +182,8 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): for sig_subject in wrapped.get_signed(): if expecting in sig_subject: + with transaction(): + pgp_address.key_confirmed = True ISubscriptionManager(mlist).confirm(token) break else: @@ -211,6 +217,14 @@ def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results): print('A pgp enabled address not found.', file=results) return ContinueProcessing.no + if pgp_address.key is None: + print("You currently don't have a key set.", file=results) + return ContinueProcessing.no + + if not pgp_address.key_confirmed: + print('Your key is currently not confirmed.', file=results) + return ContinueProcessing.no + wrapped = PGPWrapper(msg) if wrapped.is_encrypted(): decrypted = wrapped.try_decrypt(pgp_list.key) @@ -231,24 +245,108 @@ def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results): file=results) return ContinueProcessing.no - usage_flags = key.usage_flags() - for subkey in key.subkeys.values(): - usage_flags |= subkey.usage_flags() - if KeyFlags.EncryptCommunications not in usage_flags: + if not key_usable(key, {KeyFlags.EncryptCommunications, KeyFlags.Sign}): print('Need a key which can be used to encrypt communications.', file=results) return ContinueProcessing.no - workflow = KeyChangeWorkflow(mlist, pgp_address, key) + workflow_class = mm_config.workflows[pgp_list.key_change_workflow] + + workflow = workflow_class(mlist, pgp_address, key) list(workflow) print('Key change request received.', file=results) return ContinueProcessing.no def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results): - # Current key revocation certificate in attachment, restarts the - # subscription process, or rather only it's key setup part. - pass + """ + `key revoke` command. + + Used when a user has to revoke a part of a key used for a `PGPAddress`. + + * This command message CAN be encrypted to the list key, in which case it + will be decrypted. + * This command message MUST have at least one revocation certificate + attached. + """ + if len(arguments) != 1: + print('Extraneous argument/s: ' + ','.join(arguments[1:]), + file=results) + return ContinueProcessing.no + + email = get_email(msg) + if not email: + print('No email to revoke key of.', file=results) + return ContinueProcessing.no + + pgp_address = PGPAddress.for_email(email) + if pgp_address is None: + print('A pgp enabled address not found.', file=results) + return ContinueProcessing.no + + key = pgp_address.key + if key is None: + print("You currently don't have a key set.", file=results) + return ContinueProcessing.no + + if not pgp_address.key_confirmed: + print('Your key is currently not confirmed.', file=results) + return ContinueProcessing.no + + wrapped = PGPWrapper(msg) + if wrapped.is_encrypted(): + decrypted = wrapped.try_decrypt(pgp_list.key) + wrapped = PGPWrapper(decrypted) + + if not wrapped.has_revocs(): + print('No key revocations attached? Send a key revocation.', + file=results) + return ContinueProcessing.no + + key_copy = copy.copy(key) + + revocs = list(wrapped.revocs()) + matches = 0 + for revoc in revocs: + old_matches = matches + try: + verified = key_copy.verify(key_copy, revoc) + if verified: + key_copy |= revoc + matches += 1 + continue + except PGPError: + pass + + for subkey in key_copy.subkeys.values(): + try: + verified = key_copy.verify(subkey, revoc) + if verified: + subkey |= revoc + matches += 1 + break + except PGPError: + pass + # No match? + if matches == old_matches: + print('Revocation found for not-found key.', file=results) + + if not key_usable(key_copy, + {KeyFlags.EncryptCommunications, KeyFlags.Sign}): + # Start reset process. + with transaction(): + pgp_address.key = None + pgp_address.key_confirmed = False + workflow = KeyRevokeWorkflow(mlist, pgp_address) + list(workflow) + print('Key needs to be reset.', file=results) + else: + # Just update key. + if matches > 0: + with transaction(): + pgp_address.key = key_copy + print('Key succesfully updated.', file=results) + return ContinueProcessing.yes def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results): @@ -273,7 +371,7 @@ def _cmd_receive(pgp_list, mlist, msg, msgdata, arguments, results): msg['MIME-Version'] = '1.0' msg.attach(MIMEText('Here is the public key you requested.')) wrapped = MIMEWrapper(msg) - msg = wrapped.attach_key(pgp_list.pubkey) + msg = wrapped.attach_keys(pgp_list.pubkey) msg.send(mlist) return ContinueProcessing.yes diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index cfda3e8..1a128a1 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -27,13 +27,13 @@ from mailman.runners.command import CommandRunner from mailman.testing.helpers import get_queue_messages, make_testable_runner from mailman.utilities.datetime import now from pgpy import PGPKey, PGPUID -from pgpy.constants import (PubKeyAlgorithm, KeyFlags, EllipticCurveOID, - HashAlgorithm, SymmetricKeyAlgorithm, - CompressionAlgorithm) +from pgpy.constants import ( + CompressionAlgorithm, EllipticCurveOID, HashAlgorithm, KeyFlags, + PubKeyAlgorithm, SymmetricKeyAlgorithm) from zope.component import getUtility from mailman_pgp.config import mm_config -from mailman_pgp.database import transaction +from mailman_pgp.database import mm_transaction, transaction from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.mime import MIMEWrapper @@ -41,7 +41,7 @@ from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.testing.layers import PGPConfigLayer from mailman_pgp.testing.pgp import load_key from mailman_pgp.workflows.key_change import CHANGE_CONFIRM_REQUEST -from mailman_pgp.workflows.pubkey import CONFIRM_REQUEST +from mailman_pgp.workflows.key_confirm import CONFIRM_REQUEST from mailman_pgp.workflows.subscription import OpenSubscriptionPolicy @@ -128,8 +128,7 @@ class TestPreSubscription(unittest.TestCase): hashes=[HashAlgorithm.SHA256, HashAlgorithm.SHA512], ciphers=[SymmetricKeyAlgorithm.AES256], - compression=[CompressionAlgorithm.ZLIB, - CompressionAlgorithm.Uncompressed] + compression=[CompressionAlgorithm.ZLIB] ) def test_set(self): @@ -145,7 +144,7 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'Re: key set {}'.format(token)) wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -186,7 +185,7 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'Re: key set {}'.format(token)) wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) wrapped_set_message = MIMEWrapper(set_message) set_message = wrapped_set_message.encrypt(self.pgp_list.pubkey, self.bart_key.pubkey) @@ -246,9 +245,9 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'Re: key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.anne_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.anne_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -263,7 +262,7 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'Re: key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key) + set_message = wrapped_set_message.attach_keys(self.bart_key) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -278,7 +277,7 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'Re: key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.unusable_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.unusable_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -293,7 +292,7 @@ class TestPreSubscription(unittest.TestCase): def test_set_no_email(self): message = _create_mixed('', 'test@example.com', 'key set token') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.bart_key.pubkey) + message = wrapped_message.attach_keys(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -307,7 +306,7 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -326,7 +325,7 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -349,7 +348,7 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -594,9 +593,13 @@ class TestAfterSubscription(unittest.TestCase): layer = PGPConfigLayer def setUp(self): - self.mlist = create_list('test@example.com', style_name='pgp-default') - self.pgp_list = PGPMailingList.for_list(self.mlist) - self.pgp_list.key = load_key('ecc_p256.priv.asc') + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + with transaction(): + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = load_key('ecc_p256.priv.asc') + self.pgp_list.key_change_workflow = 'pgp-key-change-workflow' self.bart_key = load_key('rsa_1024.priv.asc') self.bart_new_key = load_key('ecc_p256.priv.asc') @@ -611,8 +614,7 @@ class TestAfterSubscription(unittest.TestCase): hashes=[HashAlgorithm.SHA256, HashAlgorithm.SHA512], ciphers=[SymmetricKeyAlgorithm.AES256], - compression=[CompressionAlgorithm.ZLIB, - CompressionAlgorithm.Uncompressed] + compression=[CompressionAlgorithm.ZLIB] ) def test_change(self): @@ -628,7 +630,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.bart_new_key.pubkey) + message = wrapped_message.attach_keys(self.bart_new_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -663,7 +665,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.bart_new_key.pubkey) + message = wrapped_message.attach_keys(self.bart_new_key.pubkey) wrapped_message = MIMEWrapper(message) message = wrapped_message.encrypt(self.pgp_list.pubkey) @@ -700,7 +702,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.bart_new_key.pubkey) + message = wrapped_message.attach_keys(self.bart_new_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -751,7 +753,7 @@ class TestAfterSubscription(unittest.TestCase): def test_change_no_email(self): message = _create_mixed('', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.bart_key.pubkey) + message = wrapped_message.attach_keys(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -765,7 +767,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.bart_key.pubkey) + message = wrapped_message.attach_keys(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -810,9 +812,9 @@ class TestAfterSubscription(unittest.TestCase): 'key change') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_new_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_new_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -835,7 +837,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.bart_key) + message = wrapped_message.attach_keys(self.bart_key) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -858,7 +860,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.unusable_key.pubkey) + message = wrapped_message.attach_keys(self.unusable_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -870,6 +872,173 @@ class TestAfterSubscription(unittest.TestCase): 'Need a key which can be used to encrypt communications.', results_msg.get_payload()) + def test_revoke_extra_arg(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key revoke extra arguments', '') + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Extraneous argument/s: extra,arguments', + results_msg.get_payload()) + + def test_revoke_no_email(self): + message = _create_mixed('', 'test@example.com', 'key revoke') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No email to revoke key of.', results_msg.get_payload()) + + def test_revoke_no_pgp_address(self): + message = _create_mixed('bart@example.com', 'test@example.com', + 'key revoke') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('A pgp enabled address not found.', + results_msg.get_payload()) + + def test_revoke_no_key_set(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + t.add(pgp_address) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key revoke') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn("You currently don't have a key set.", + results_msg.get_payload()) + + def test_revoke_key_not_confirmed(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + t.add(pgp_address) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key revoke') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Your key is currently not confirmed.', + results_msg.get_payload()) + + def test_revoke_no_revocs(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + message = _create_plain('bart@example.com', 'test@example.com', + 'key revoke', '') + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No key revocations attached? Send a key revocation.', + results_msg.get_payload()) + + def test_revoke_resets(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + revoc = self.bart_key.revoke(self.bart_key) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key revoke') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_revocs(revoc) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=2) + if (items[0].msg['Subject'] == + 'The results of your email commands'): # pragma: no cover + results_msg = items[0].msg + else: + results_msg = items[1].msg + #TODO: finish test + + self.assertIn('Key needs to be reset.', results_msg.get_payload()) + + def test_revoke_updates(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + + test_key = PGPKey.new(PubKeyAlgorithm.RSAEncryptOrSign, 1024) + uid = PGPUID.new('Some Name', email='anne@example.org') + test_key.add_uid(uid, + usage={KeyFlags.Certify, + KeyFlags.EncryptCommunications, + KeyFlags.Sign}, + hashes=[HashAlgorithm.SHA256, + HashAlgorithm.SHA512], + ciphers=[SymmetricKeyAlgorithm.AES256], + compression=[CompressionAlgorithm.ZLIB]) + sub = PGPKey.new(PubKeyAlgorithm.ECDH, EllipticCurveOID.SECP256K1) + test_key.add_subkey(sub, usage={KeyFlags.EncryptCommunications}) + + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = test_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + revoc = test_key.revoke(sub.pubkey) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key revoke') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_revocs(revoc) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Key succesfully updated.', results_msg.get_payload()) + sub = next(iter(pgp_address.key.subkeys.values())) + revocs = list(sub.revocation_signatures) + self.assertEqual(len(revocs), 1) + self.assertEqual(revoc.hash2, revocs[0].hash2) + class TestGeneral(unittest.TestCase): layer = PGPConfigLayer diff --git a/src/mailman_pgp/model/list.py b/src/mailman_pgp/model/list.py index d80f4a7..9c2d317 100644 --- a/src/mailman_pgp/model/list.py +++ b/src/mailman_pgp/model/list.py @@ -54,6 +54,10 @@ class PGPMailingList(Base): nonencrypted_msg_action = Column(Enum(Action), default=Action.reject) encrypt_outgoing = Column(Boolean, default=True) + # Key related properties + key_change_workflow = Column(SAUnicode, + default='pgp-key-change-mod-workflow') + def __init__(self, mlist): """ diff --git a/src/mailman_pgp/pgp/inline.py b/src/mailman_pgp/pgp/inline.py index bb0971d..49f0a6e 100644 --- a/src/mailman_pgp/pgp/inline.py +++ b/src/mailman_pgp/pgp/inline.py @@ -18,12 +18,14 @@ """Strict inline PGP message wrapper.""" import copy from email.iterators import walk +from email.mime.text import MIMEText from pgpy import PGPMessage from pgpy.constants import SymmetricKeyAlgorithm from public import public -from mailman_pgp.utils.pgp import key_from_blob +from mailman_pgp.utils.email import make_multipart +from mailman_pgp.utils.pgp import key_from_blob, revoc_from_blob @public @@ -95,7 +97,7 @@ class InlineWrapper: :rtype: typing.Generator[pgpy.PGPMessage] """ for part in walk(self.msg): - if not part.is_multipart() and self._is_signed(part): + if not part.is_multipart(): try: msg = PGPMessage.from_blob(part.get_payload()) except: @@ -135,7 +137,7 @@ class InlineWrapper: :rtype: typing.Generator[pgpy.PGPMessage] """ for part in walk(self.msg): - if not part.is_multipart() and self._is_encrypted(part): + if not part.is_multipart(): try: msg = PGPMessage.from_blob(part.get_payload()) except: @@ -176,13 +178,56 @@ class InlineWrapper: :rtype: Generator[pgpy.PGPKey] """ for part in walk(self.msg): - if not part.is_multipart() and self._has_keys(part): + if not part.is_multipart(): try: key = key_from_blob(part.get_payload()) except: continue yield key + def _is_revoc(self, part): + try: + revoc_from_blob(part.get_payload()) + except ValueError: + return False + return True + + def is_revocs(self): + for part in walk(self.msg): + if (not part.is_multipart() and not self._is_revoc(part)): + return False + return True + + def has_revocs(self): + for part in walk(self.msg): + if (not part.is_multipart() and self._is_revoc(part)): + return True + return False + + def revocs(self): + for part in walk(self.msg): + if not part.is_multipart(): + try: + revoc = revoc_from_blob(part.get_payload()) + except: + continue + yield revoc + + def attach_revocs(self, *key_revocations): + """ + Attach a key revocation signature to the message. + + :param key_revocations: A key revocation signature to attach. + :type key_revocations: pgpy.PGPSignature + :return: The message with the signature attached. + :rtype: mailman.email.message.Message + """ + out = make_multipart(self.msg) + for key_revocation in key_revocations: + revoc_part = MIMEText(str(key_revocation)) + out.attach(revoc_part) + return out + def verify(self, key): """ Verify the signatures of this message with key. diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py index a1303c9..03177ab 100644 --- a/src/mailman_pgp/pgp/mime.py +++ b/src/mailman_pgp/pgp/mime.py @@ -28,8 +28,8 @@ from pgpy import PGPDetachedSignature, PGPMessage from pgpy.constants import HashAlgorithm, SymmetricKeyAlgorithm from public import public -from mailman_pgp.utils.email import copy_headers -from mailman_pgp.utils.pgp import key_from_blob +from mailman_pgp.utils.email import copy_headers, make_multipart +from mailman_pgp.utils.pgp import key_from_blob, revoc_from_blob @public @@ -63,7 +63,7 @@ class MIMEWrapper: def _is_mime(self): is_multipart = self.msg.is_multipart() - payloads = len(self.msg.get_payload()) + payloads = len(self.msg.get_payload()) if self.msg.get_payload() else 0 return is_multipart and payloads == 2 @@ -186,26 +186,81 @@ class MIMEWrapper: continue yield key - def attach_key(self, key): + def attach_keys(self, *keys): """ Attach a key to this message, as per RFC3156 section 7. - :param key: A key to attach. - :type key: pgpy.PGPKey + :param keys: A key to attach. + :type keys: pgpy.PGPKey :return: The message with the key attached. :rtype: mailman.email.message.Message """ - filename = '0x' + key.fingerprint.keyid + '.asc' - key_part = MIMEApplication(_data=str(key), - _subtype=MIMEWrapper._keys_subtype, - _encoder=encode_7or8bit, - name=filename) - key_part.add_header('Content-Description', - 'OpenPGP key') - key_part.add_header('Content-Disposition', 'attachment', - filename=filename) - out = copy.deepcopy(self.msg) - out.attach(key_part) + out = make_multipart(self.msg) + for key in keys: + filename = '0x' + key.fingerprint.keyid + '.asc' + key_part = MIMEApplication(_data=str(key), + _subtype=MIMEWrapper._keys_subtype, + _encoder=encode_7or8bit, + name=filename) + key_part.add_header('Content-Description', + 'OpenPGP key') + key_part.add_header('Content-Disposition', 'attachment', + filename=filename) + out.attach(key_part) + return out + + def _is_revoc(self, part): + if part.get_content_type() != MIMEWrapper._keys_type: + return False + try: + revoc_from_blob(part.get_payload()) + except ValueError: + return False + return True + + def is_revocs(self): + for part in walk(self.msg): + if (not part.is_multipart() and not self._is_revoc(part)): + return False + return True + + def has_revocs(self): + for part in walk(self.msg): + if (not part.is_multipart() and self._is_revoc(part)): + return True + return False + + def revocs(self): + for part in walk(self.msg): + if (not part.is_multipart() # noqa + and part.get_content_type() == MIMEWrapper._keys_type): + try: + revoc = revoc_from_blob(part.get_payload()) + except: + continue + yield revoc + + def attach_revocs(self, *key_revocations): + """ + Attach a key revocation signature to the message, as a key subpart. + + :param key_revocations: A key revocation signature to attach. + :type key_revocations: pgpy.PGPSignature + :return: The message with the signature attached. + :rtype: mailman.email.message.Message + """ + out = make_multipart(self.msg) + for key_revocation in key_revocations: + filename = '0x' + key_revocation.signer + '.asc' + revoc_part = MIMEApplication(_data=str(key_revocation), + _subtype=MIMEWrapper._keys_subtype, + _encoder=encode_7or8bit, + name=filename) + revoc_part.add_header('Content-Description', + 'OpenPGP key') + revoc_part.add_header('Content-Disposition', 'attachment', + filename=filename) + out.attach(revoc_part) return out def verify(self, key): diff --git a/src/mailman_pgp/pgp/tests/data/messages/inline_revoc.eml b/src/mailman_pgp/pgp/tests/data/messages/inline_revoc.eml new file mode 100644 index 0000000..f215777 --- /dev/null +++ b/src/mailman_pgp/pgp/tests/data/messages/inline_revoc.eml @@ -0,0 +1,19 @@ +To: nobody@example.org +From: RSA 1024b example <RSA-1024b@example.org> +Subject: Some subject. +Message-ID: <76a591ed-bfc4-d08b-73d3-fc2489148fd7@example.org> +Date: Wed, 21 Jun 2017 13:50:59 +0200 +User-Agent: Mutt/1.7.2 (2016-11-26) +MIME-Version: 1.0 +Content-Type: text/plain; charset=utf-8 +Content-Transfer-Encoding: 8bit + +-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: This is a revocation certificate + +iLYEIAEIACAWIQTUqUiGscoglqZFjlxH8QwIeIS3WAUCWYClvgIdAgAKCRBH8QwI +eIS3WLpBBADCUtyYmI2Z8DCOnKUW4nRjHc3ZVMoZJlwceJCWhSybBrnjo6LWzvBy +eKke4qHlh+jmSk3/Qyio6vYzvicOayDwhr0s/1X26MiYorthfiCQOg2WQ0YiRuMC +/Ml8rukvBTRGvXikcIuBw5MFqCWsWI7ExPKaaresnHaCn37KF0A6hw== +=4oku +-----END PGP PUBLIC KEY BLOCK----- diff --git a/src/mailman_pgp/pgp/tests/data/messages/inline_revoc_multipart.eml b/src/mailman_pgp/pgp/tests/data/messages/inline_revoc_multipart.eml new file mode 100644 index 0000000..a1052d6 --- /dev/null +++ b/src/mailman_pgp/pgp/tests/data/messages/inline_revoc_multipart.eml @@ -0,0 +1,30 @@ +To: nobody@example.org +From: RSA 1024b example <RSA-1024b@example.org> +Subject: Some subject. +Message-ID: <76a591ed-bfc4-d08b-73d3-fc2489148fd7@example.org> +Date: Wed, 21 Jun 2017 13:50:59 +0200 +User-Agent: Mutt/1.7.2 (2016-11-26) +MIME-Version: 1.0 +Content-Type: multipart/mixed; boundary="abjqkjsfwqsfa546qw2wfq6sdq2sqwr56qqs" + +--abjqkjsfwqsfa546qw2wfq6sdq2sqwr56qqs +Content-Type: text/plain; charset=utf-8 +Content-Transfer-Encoding: 8bit + +-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: This is a revocation certificate + +iLYEIAEIACAWIQTUqUiGscoglqZFjlxH8QwIeIS3WAUCWYClvgIdAgAKCRBH8QwI +eIS3WLpBBADCUtyYmI2Z8DCOnKUW4nRjHc3ZVMoZJlwceJCWhSybBrnjo6LWzvBy +eKke4qHlh+jmSk3/Qyio6vYzvicOayDwhr0s/1X26MiYorthfiCQOg2WQ0YiRuMC +/Ml8rukvBTRGvXikcIuBw5MFqCWsWI7ExPKaaresnHaCn37KF0A6hw== +=4oku +-----END PGP PUBLIC KEY BLOCK----- + +--abjqkjsfwqsfa546qw2wfq6sdq2sqwr56qqs +Content-Type: text/plain; charset=utf-8 +Content-Transfer-Encoding: 8bit + +Some cleartext. + +--abjqkjsfwqsfa546qw2wfq6sdq2sqwr56qqs--
\ No newline at end of file diff --git a/src/mailman_pgp/pgp/tests/data/messages/mime_revoc.eml b/src/mailman_pgp/pgp/tests/data/messages/mime_revoc.eml new file mode 100644 index 0000000..e1055a3 --- /dev/null +++ b/src/mailman_pgp/pgp/tests/data/messages/mime_revoc.eml @@ -0,0 +1,35 @@ +To: nobody@example.org +From: RSA 1024b example <RSA-1024b@example.org> +Subject: Some subject. +Message-ID: <76a591ed-bfc4-d08b-73d3-fc2489148fd7@example.org> +Date: Wed, 21 Jun 2017 13:50:59 +0200 +User-Agent: Mutt/1.7.2 (2016-11-26) +MIME-Version: 1.0 +Content-Type: multipart/mixed; + boundary="------------A851F166D50529639139DD0B" + +This is a multi-part message in MIME format. +--------------A851F166D50529639139DD0B +Content-Type: text/plain; charset=utf-8 +Content-Transfer-Encoding: 7bit + +Some other text. + +--------------A851F166D50529639139DD0B +Content-Type: application/pgp-keys; + name="0x7884B758.asc" +Content-Transfer-Encoding: 7bit +Content-Disposition: attachment; + filename="0x7884B758.asc" + +-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: This is a revocation certificate + +iLYEIAEIACAWIQTUqUiGscoglqZFjlxH8QwIeIS3WAUCWYClvgIdAgAKCRBH8QwI +eIS3WLpBBADCUtyYmI2Z8DCOnKUW4nRjHc3ZVMoZJlwceJCWhSybBrnjo6LWzvBy +eKke4qHlh+jmSk3/Qyio6vYzvicOayDwhr0s/1X26MiYorthfiCQOg2WQ0YiRuMC +/Ml8rukvBTRGvXikcIuBw5MFqCWsWI7ExPKaaresnHaCn37KF0A6hw== +=4oku +-----END PGP PUBLIC KEY BLOCK----- + +--------------A851F166D50529639139DD0B-- diff --git a/src/mailman_pgp/pgp/tests/data/revocs/dsa_elgamal_1024.revoc.asc b/src/mailman_pgp/pgp/tests/data/revocs/dsa_elgamal_1024.revoc.asc new file mode 100644 index 0000000..80b002e --- /dev/null +++ b/src/mailman_pgp/pgp/tests/data/revocs/dsa_elgamal_1024.revoc.asc @@ -0,0 +1,8 @@ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: This is a revocation certificate + +iGAEIBECACAWIQRR5fGEErwNQ8nevZPDzAoGOEHHpwUCWYCmxwIdAgAKCRDDzAoG +OEHHp+nYAJwM095Qv9qxaMfqx8xSLD5eKeExgQCeNA5z7zZZcgtUTTqW/o4baX6D +X8E= +=vVlH +-----END PGP PUBLIC KEY BLOCK----- diff --git a/src/mailman_pgp/pgp/tests/data/revocs/ecc_curve25519.revoc.asc b/src/mailman_pgp/pgp/tests/data/revocs/ecc_curve25519.revoc.asc new file mode 100644 index 0000000..df7d593 --- /dev/null +++ b/src/mailman_pgp/pgp/tests/data/revocs/ecc_curve25519.revoc.asc @@ -0,0 +1,8 @@ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: This is a revocation certificate + +iHgEIBYIACAWIQQFsA605escSIQh4V/6me+juNPIugUCWYCmggIdAgAKCRD6me+j +uNPIuuVxAP4mr8PXnLIzqdysvMo5Mo0QbIck6NgO/rOJC4Kj/tGjyQD/QMo4ON/1 +cxzD068xWtlDpPYjtPBFuNYrLOPDYvqjGAg= +=0TTd +-----END PGP PUBLIC KEY BLOCK----- diff --git a/src/mailman_pgp/pgp/tests/data/revocs/ecc_p256.revoc.asc b/src/mailman_pgp/pgp/tests/data/revocs/ecc_p256.revoc.asc new file mode 100644 index 0000000..9571cd2 --- /dev/null +++ b/src/mailman_pgp/pgp/tests/data/revocs/ecc_p256.revoc.asc @@ -0,0 +1,8 @@ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: This is a revocation certificate + +iHgEIBMIACAWIQSyKLDm+5ZVhlrf0j03k3IFvhWo0AUCWYCmZAIdAgAKCRA3k3IF +vhWo0HAZAP9qFHrB6d+hNpYDT9jVCDIURh8Ml711hi8zsDhSkRZ4yAEAgzlmzjPZ +VhgcOemgbYTKO7Kj8q61KpVP9oZ9l7Z4c/I= +=ZZsy +-----END PGP PUBLIC KEY BLOCK----- diff --git a/src/mailman_pgp/pgp/tests/data/revocs/ecc_secp256k1.revoc.asc b/src/mailman_pgp/pgp/tests/data/revocs/ecc_secp256k1.revoc.asc new file mode 100644 index 0000000..e91a62a --- /dev/null +++ b/src/mailman_pgp/pgp/tests/data/revocs/ecc_secp256k1.revoc.asc @@ -0,0 +1,8 @@ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: This is a revocation certificate + +iHgEIBMIACAWIQRi7dSuUMnX5cDo6nvIw1p/AFOkvwUCWYCmEwIdAgAKCRDIw1p/ +AFOkv5zZAQDiOzuQWpL2cvm+Jz4XyeLWebiOe7zirM9oMP03rmzQxgD/UIzBnqq9 +iy6oREDvbNw1sCB8/90ihlkB3iM8eOW9iSk= +=lj1I +-----END PGP PUBLIC KEY BLOCK----- diff --git a/src/mailman_pgp/pgp/tests/data/revocs/rsa_1024.revoc.asc b/src/mailman_pgp/pgp/tests/data/revocs/rsa_1024.revoc.asc new file mode 100644 index 0000000..ddb8974 --- /dev/null +++ b/src/mailman_pgp/pgp/tests/data/revocs/rsa_1024.revoc.asc @@ -0,0 +1,9 @@ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: This is a revocation certificate + +iLYEIAEIACAWIQTUqUiGscoglqZFjlxH8QwIeIS3WAUCWYClvgIdAgAKCRBH8QwI +eIS3WLpBBADCUtyYmI2Z8DCOnKUW4nRjHc3ZVMoZJlwceJCWhSybBrnjo6LWzvBy +eKke4qHlh+jmSk3/Qyio6vYzvicOayDwhr0s/1X26MiYorthfiCQOg2WQ0YiRuMC +/Ml8rukvBTRGvXikcIuBw5MFqCWsWI7ExPKaaresnHaCn37KF0A6hw== +=4oku +-----END PGP PUBLIC KEY BLOCK----- diff --git a/src/mailman_pgp/pgp/tests/test_inline.py b/src/mailman_pgp/pgp/tests/test_inline.py index 7f82ab4..d9f8c8a 100644 --- a/src/mailman_pgp/pgp/tests/test_inline.py +++ b/src/mailman_pgp/pgp/tests/test_inline.py @@ -20,7 +20,8 @@ from parameterized import parameterized from mailman_pgp.pgp.inline import InlineWrapper -from mailman_pgp.testing.pgp import load_key, load_message, WrapperTestCase +from mailman_pgp.testing.pgp import ( + load_key, load_message, load_revoc, WrapperTestCase) class InlineWrapperTestCase(WrapperTestCase): @@ -219,6 +220,46 @@ class TestKeys(InlineWrapperTestCase): self.keys(message, keys) +class TestRevocs(InlineWrapperTestCase): + @parameterized.expand([ + (load_message('inline_revoc.eml'), + True), + (load_message('inline_revoc_multipart.eml'), + True) + ]) + def test_has_revocs(self, message, has_revocs): + self.has_revocs(message, has_revocs) + + @parameterized.expand([ + (load_message('inline_revoc.eml'), + True), + (load_message('inline_revoc_multipart.eml'), + False) + ]) + def test_is_revocs(self, message, is_revocs): + self.is_revocs(message, is_revocs) + + @parameterized.expand([ + (load_message('inline_revoc.eml'), + (load_revoc('rsa_1024.revoc.asc'),)), + (load_message('inline_revoc_multipart.eml'), + (load_revoc('rsa_1024.revoc.asc'),)) + ]) + def test_revocs(self, message, revocs): + self.revocs(message, revocs) + + @parameterized.expand([ + (load_message('clear.eml'), + [load_revoc('rsa_1024.revoc.asc'), + load_revoc('ecc_p256.revoc.asc')]), + (load_message('clear_multipart.eml'), + [load_revoc('rsa_1024.revoc.asc'), + load_revoc('ecc_p256.revoc.asc')]) + ]) + def test_attach_revocs(self, message, revocs): + self.attach_revocs(message, revocs) + + class TestCombined(InlineWrapperTestCase): @parameterized.expand([ (load_message('clear.eml'), diff --git a/src/mailman_pgp/pgp/tests/test_mime.py b/src/mailman_pgp/pgp/tests/test_mime.py index 78c9e71..e9951cf 100644 --- a/src/mailman_pgp/pgp/tests/test_mime.py +++ b/src/mailman_pgp/pgp/tests/test_mime.py @@ -20,7 +20,8 @@ from parameterized import parameterized from mailman_pgp.pgp.mime import MIMEWrapper -from mailman_pgp.testing.pgp import load_key, load_message, WrapperTestCase +from mailman_pgp.testing.pgp import ( + load_key, load_message, load_revoc, WrapperTestCase) class MIMEWrapperTestCase(WrapperTestCase): @@ -178,6 +179,51 @@ class TestKeys(MIMEWrapperTestCase): def test_keys(self, message, keys): self.keys(message, keys) + @parameterized.expand([ + (load_message('clear.eml'), + [load_key('rsa_1024.priv.asc'), + load_key('ecc_p256.priv.asc')]), + (load_message('clear_multipart.eml'), + [load_key('rsa_1024.priv.asc'), + load_key('ecc_p256.priv.asc')]) + ]) + def test_attach_keys(self, message, keys): + self.attach_keys(message, keys) + + +class TestRevocs(MIMEWrapperTestCase): + @parameterized.expand([ + (load_message('mime_revoc.eml'), + True) + ]) + def test_has_revocs(self, message, has_revocs): + self.has_revocs(message, has_revocs) + + @parameterized.expand([ + (load_message('mime_revoc.eml'), + False) + ]) + def test_is_revocs(self, message, is_revocs): + self.is_revocs(message, is_revocs) + + @parameterized.expand([ + (load_message('mime_revoc.eml'), + (load_revoc('rsa_1024.revoc.asc'),)) + ]) + def test_revocs(self, message, revocs): + self.revocs(message, revocs) + + @parameterized.expand([ + (load_message('clear.eml'), + [load_revoc('rsa_1024.revoc.asc'), + load_revoc('ecc_p256.revoc.asc')]), + (load_message('clear_multipart.eml'), + [load_revoc('rsa_1024.revoc.asc'), + load_revoc('ecc_p256.revoc.asc')]) + ]) + def test_attach_revocs(self, message, revocs): + self.attach_revocs(message, revocs) + class TestCombined(MIMEWrapperTestCase): @parameterized.expand([ diff --git a/src/mailman_pgp/pgp/tests/test_wrapper.py b/src/mailman_pgp/pgp/tests/test_wrapper.py index f1f7621..b9c157a 100644 --- a/src/mailman_pgp/pgp/tests/test_wrapper.py +++ b/src/mailman_pgp/pgp/tests/test_wrapper.py @@ -22,7 +22,8 @@ from mailman_pgp.pgp.inline import InlineWrapper from mailman_pgp.pgp.mime import MIMEWrapper from mailman_pgp.pgp.mime_multisig import MIMEMultiSigWrapper from mailman_pgp.pgp.wrapper import PGPWrapper -from mailman_pgp.testing.pgp import load_key, load_message, WrapperTestCase +from mailman_pgp.testing.pgp import ( + load_key, load_message, load_revoc, WrapperTestCase) class PGPWrapperTestCase(WrapperTestCase): @@ -180,6 +181,41 @@ class TestKeys(PGPWrapperTestCase): self.keys(message, keys) +class TestRevocs(PGPWrapperTestCase): + @parameterized.expand([ + (load_message('mime_revoc.eml'), + True), + (load_message('inline_revoc.eml'), + True), + (load_message('inline_revoc_multipart.eml'), + True) + ]) + def test_has_revocs(self, message, has_revocs): + self.has_revocs(message, has_revocs) + + @parameterized.expand([ + (load_message('mime_revoc.eml'), + False), + (load_message('inline_revoc.eml'), + True), + (load_message('inline_revoc_multipart.eml'), + False) + ]) + def test_is_revocs(self, message, is_revocs): + self.is_revocs(message, is_revocs) + + @parameterized.expand([ + (load_message('mime_revoc.eml'), + (load_revoc('rsa_1024.revoc.asc'),)), + (load_message('inline_revoc.eml'), + (load_revoc('rsa_1024.revoc.asc'),)), + (load_message('inline_revoc_multipart.eml'), + (load_revoc('rsa_1024.revoc.asc'),)) + ]) + def test_revocs(self, message, revocs): + self.revocs(message, revocs) + + class TestCombined(PGPWrapperTestCase): @parameterized.expand([ (load_message('clear.eml'), diff --git a/src/mailman_pgp/pgp/wrapper.py b/src/mailman_pgp/pgp/wrapper.py index 6193b57..f0519cb 100644 --- a/src/mailman_pgp/pgp/wrapper.py +++ b/src/mailman_pgp/pgp/wrapper.py @@ -270,7 +270,7 @@ class PGPWrapper(): Get the collection of keys in this message. :return: A collection of keys. - :rtype: Generator[pgpy.PGPKey] + :rtype: typing.Generator[pgpy.PGPKey] """ if self.mime.has_keys(): yield from self.mime.keys() @@ -278,3 +278,32 @@ class PGPWrapper(): yield from self.multisig.keys() elif self.inline.has_keys(): yield from self.inline.keys() + + def has_revocs(self): + """ + + :return: + :rtype: bool + """ + return any(wrapper.has_revocs() for wrapper in self.wrappers) + + def is_revocs(self): + """ + + :return: + :rtype: bool + """ + return any(wrapper.is_revocs() for wrapper in self.wrappers) + + def revocs(self): + """ + + :return: + :rtype: typing.Generator[pgpy.PGPSignature] + """ + if self.mime.has_revocs(): + yield from self.mime.revocs() + elif self.multisig.has_revocs(): + yield from self.multisig.revocs() + elif self.inline.has_revocs(): + yield from self.inline.revocs() diff --git a/src/mailman_pgp/testing/pgp.py b/src/mailman_pgp/testing/pgp.py index 844b708..89f7a98 100644 --- a/src/mailman_pgp/testing/pgp.py +++ b/src/mailman_pgp/testing/pgp.py @@ -19,6 +19,7 @@ import os from email import message_from_bytes +from operator import attrgetter from unittest import TestCase from mailman.email.message import Message @@ -26,21 +27,36 @@ from pgpy import PGPKey from pkg_resources import resource_string from mailman_pgp.testing.layers import PGPLayer +from mailman_pgp.utils.pgp import revoc_from_blob + + +def load_blob(*path): + return resource_string('mailman_pgp.pgp.tests', + os.path.join('data', *path)) def load_message(path): - data = resource_string('mailman_pgp.pgp.tests', - os.path.join('data', 'messages', path)) - return message_from_bytes(data, Message) + """ + :rtype: Message + """ + return message_from_bytes(load_blob('messages', path), Message) def load_key(path): - key, _ = PGPKey.from_blob( - resource_string('mailman_pgp.pgp.tests', - os.path.join('data', 'keys', path))) + """ + :rtype: pgpy.PGPKey + """ + key, _ = PGPKey.from_blob(load_blob('keys', path)) return key +def load_revoc(path): + """ + :rtype: pgpy.PGPSignature + """ + return revoc_from_blob(load_blob('revocs', path)) + + def payload_equal(one_msg, other_msg): one_payload = one_msg.get_payload() other_payload = other_msg.get_payload() @@ -135,10 +151,49 @@ class WrapperTestCase(TestCase): loaded = list(wrapped.keys()) self.assertEqual(len(loaded), len(keys)) - loaded_fingerprints = list(map(lambda key: key.fingerprint, loaded)) - fingerprints = list(map(lambda key: key.fingerprint, keys)) + loaded_fingerprints = list(map(attrgetter('fingerprint'), loaded)) + fingerprints = list(map(attrgetter('fingerprint'), keys)) + self.assertListEqual(loaded_fingerprints, fingerprints) + + def attach_keys(self, message, keys): + wrapped = self.wrap(message) + attached = wrapped.attach_keys(*keys) + wrapped = self.wrap(attached) + loaded = list(wrapped.keys()) + + self.assertTrue(wrapped.has_keys()) + loaded_fingerprints = list(map(attrgetter('fingerprint'), loaded)) + fingerprints = list(map(attrgetter('fingerprint'), keys)) self.assertListEqual(loaded_fingerprints, fingerprints) + def has_revocs(self, message, has_revocs): + wrapped = self.wrap(message) + self.assertEqual(wrapped.has_revocs(), has_revocs) + + def is_revocs(self, message, is_revocs): + wrapped = self.wrap(message) + self.assertEqual(wrapped.is_revocs(), is_revocs) + + def revocs(self, message, revocs): + wrapped = self.wrap(message) + loaded = list(wrapped.revocs()) + self.assertEqual(len(loaded), len(revocs)) + + loaded_issuers = list(map(attrgetter('signer'), loaded)) + issuers = list(map(attrgetter('signer'), revocs)) + self.assertListEqual(loaded_issuers, issuers) + + def attach_revocs(self, message, revocs): + wrapped = self.wrap(message) + attached = wrapped.attach_revocs(*revocs) + wrapped = self.wrap(attached) + loaded = list(wrapped.revocs()) + + self.assertTrue(wrapped.has_revocs()) + loaded_issuers = list(map(attrgetter('signer'), loaded)) + issuers = list(map(attrgetter('signer'), revocs)) + self.assertListEqual(loaded_issuers, issuers) + def sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key): wrapped = self.wrap(message) encrypted = wrapped.sign_encrypt(sign_key, encrypt_key.pubkey) diff --git a/src/mailman_pgp/utils/email.py b/src/mailman_pgp/utils/email.py index a936458..ac1ab66 100644 --- a/src/mailman_pgp/utils/email.py +++ b/src/mailman_pgp/utils/email.py @@ -16,8 +16,10 @@ # this program. If not, see <http://www.gnu.org/licenses/>. """""" +import copy from email.utils import parseaddr +from mailman.email.message import MultipartDigestMessage from public import public @@ -62,6 +64,25 @@ def overwrite_message(from_msg, to_msg): @public +def make_multipart(msg): + """ + + :param msg: + :type msg: email.message.Message + :return: + :rtype: email.message.MIMEMultipart| + mailman.email.message.MultipartDigestMessage + """ + if msg.is_multipart(): + out = copy.deepcopy(msg) + else: + out = MultipartDigestMessage() + out.attach(msg) + copy_headers(msg, out) + return out + + +@public def get_email(msg): display_name, email = parseaddr(msg['from']) # Address could be None or the empty string. diff --git a/src/mailman_pgp/utils/pgp.py b/src/mailman_pgp/utils/pgp.py index 4251693..a8f06f2 100644 --- a/src/mailman_pgp/utils/pgp.py +++ b/src/mailman_pgp/utils/pgp.py @@ -16,7 +16,11 @@ # this program. If not, see <http://www.gnu.org/licenses/>. """Miscellaneous PGP utilities.""" -from pgpy import PGPKey +from pgpy import PGPKey, PGPSignature +from pgpy.constants import SignatureType +from pgpy.errors import PGPError +from pgpy.packet import Packet, Signature +from pgpy.types import Armorable from public import public @@ -72,3 +76,72 @@ def key_from_file(file): """ key, _ = PGPKey.from_file(file) return key + + +@public +def revoc_from_blob(blob): + """ + Load a key revocation signature from an ASCII-Armored blob. + + :param blob: + :return: + :rtype: pgpy.PGPSignature + """ + dearm = Armorable.ascii_unarmor(blob) + p = Packet(dearm['body']) + + if not isinstance(p, Signature): + raise ValueError('Not a key revocation signature.') + if p.sigtype not in (SignatureType.KeyRevocation, + SignatureType.SubkeyRevocation): + raise ValueError('Not a key revocation.') + + sig = PGPSignature() + sig |= p + return sig + + +@public +def key_usable(key, flags_required): + """ + Check that the `key` has the `flags_required` set of KeyFlags. + + Checks only non-expired, non-revoked key/subkeys. Validates revocations it + can, so not those made with some other designated revocation key. + + :param key: The key to check. + :type key: pgpy.PGPKey + :param flags_required: The set of flags required. + :type flags_required: set + :return: Whether the key has the flags_required. + :rtype: bool + """ + if key.is_expired: + return False + for revoc in key.revocation_signatures: + try: + verified = key.verify(key, revoc) + except PGPError: + continue + if bool(verified): + return False + + usage_flags = key.usage_flags() + for subkey in key.subkeys.values(): + if subkey.is_expired: + continue + + valid = True + for revoc in subkey.revocation_signatures: + try: + verified = key.verify(subkey, revoc) + except PGPError: + continue + if bool(verified): + valid = False + break + + if valid: + usage_flags |= subkey.usage_flags() + + return flags_required.issubset(usage_flags) diff --git a/src/mailman_pgp/utils/tests/__init__.py b/src/mailman_pgp/utils/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/mailman_pgp/utils/tests/__init__.py diff --git a/src/mailman_pgp/utils/tests/test_pgp.py b/src/mailman_pgp/utils/tests/test_pgp.py new file mode 100644 index 0000000..b6433d4 --- /dev/null +++ b/src/mailman_pgp/utils/tests/test_pgp.py @@ -0,0 +1,86 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +import datetime +import time +from unittest import TestCase + +from parameterized import parameterized +from pgpy import PGPKey, PGPUID +from pgpy.constants import ( + CompressionAlgorithm, EllipticCurveOID, HashAlgorithm, KeyFlags, + PubKeyAlgorithm, SymmetricKeyAlgorithm) + +from mailman_pgp.testing.layers import PGPLayer +from mailman_pgp.testing.pgp import load_blob, load_key +from mailman_pgp.utils.pgp import key_usable, revoc_from_blob + + +class TestPGPUtils(TestCase): + layer = PGPLayer + + @parameterized.expand([ + (load_blob('revocs', 'rsa_1024.revoc.asc'), + load_key('rsa_1024.pub.asc')), + (load_blob('revocs', 'ecc_secp256k1.revoc.asc'), + load_key('ecc_secp256k1.pub.asc')), + (load_blob('revocs', 'ecc_p256.revoc.asc'), + load_key('ecc_p256.pub.asc')) + ]) + def test_revoc_from_blob_valid(self, blob, key): + revoc = revoc_from_blob(blob) + verifies = key.verify(key, revoc) + self.assertTrue(bool(verifies)) + + @parameterized.expand([ + ('Not an ASCII-Armored blob',), + (load_blob('keys', 'rsa_1024.pub.asc'),), + ]) + def test_revoc_from_blob_invalid(self, blob): + self.assertRaises(ValueError, revoc_from_blob, blob) + + def test_key_usable_expired(self): + key = PGPKey.new(PubKeyAlgorithm.ECDSA, EllipticCurveOID.SECP256K1) + uid = PGPUID.new('Some Name', email='anne@example.org') + key.add_uid(uid, key_expiration=datetime.timedelta(seconds=1), + usage={KeyFlags.Certify, + KeyFlags.Authentication, + KeyFlags.Sign}, + hashes=[HashAlgorithm.SHA256, + HashAlgorithm.SHA512], + ciphers=[SymmetricKeyAlgorithm.AES256], + compression=[CompressionAlgorithm.ZLIB]) + + time.sleep(2) + + self.assertFalse(key_usable(key, set())) + + def test_key_usable_revoked(self): + key = load_key('ecc_p256.priv.asc') + rsig = key.revoke(key) + key |= rsig + + self.assertFalse(key_usable(key, set())) + + def test_key_usable_subkey_revoked(self): + key = load_key('ecc_p256.priv.asc') + sub = next(iter(key.subkeys.values())) + rsig = key.revoke(sub) + sub |= rsig + + self.assertFalse(key_usable(key, {KeyFlags.EncryptCommunications})) diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py index 036dc18..9fc5d58 100644 --- a/src/mailman_pgp/workflows/base.py +++ b/src/mailman_pgp/workflows/base.py @@ -16,15 +16,75 @@ # this program. If not, see <http://www.gnu.org/licenses/>. """""" +from datetime import timedelta + +from mailman.interfaces.pending import IPendings +from mailman.interfaces.subscriptions import TokenOwner +from mailman.utilities.datetime import now +from mailman.utilities.modules import abstract_component +from mailman.workflows.common import SubscriptionBase +from public import public +from zope.component import getUtility from mailman_pgp.database import transaction from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +@public class PGPMixin: - def _step_pgp_prepare(self): - pgp_address = PGPAddress.for_address(self.address) - if pgp_address is None: + def __init__(self, mlist, pgp_address=None): + self.mlist = mlist + self.pgp_list = PGPMailingList.for_list(mlist) + self.pgp_address = pgp_address + if self.pgp_address is not None: + self.address = self.pgp_address.address + + @property + def address_key(self): + return self.pgp_address.email + + @address_key.setter + def address_key(self, value): + self.pgp_address = PGPAddress.for_email(value) + self.member = self.mlist.regular_members.get_member(value) + + def _step_create_address(self): + self.pgp_address = PGPAddress.for_address(self.address) + if self.pgp_address is None: with transaction() as t: - pgp_address = PGPAddress(self.address) - t.add(pgp_address) + self.pgp_address = PGPAddress(self.address) + t.add(self.pgp_address) + + def _step_restore_address(self): + self.pgp_address = PGPAddress.for_address(self.address) + + def _set_token(self, token_owner): + assert isinstance(token_owner, TokenOwner) + pendings = getUtility(IPendings) + if self.token is not None: + pendings.confirm(self.token) + self.token_owner = token_owner + if token_owner is TokenOwner.no_one: + self.token = None + return + + pendable = self.pendable_class()( + list_id=self.mlist.list_id, + email=self.address.email, + display_name=self.address.display_name, + when=now().replace(microsecond=0).isoformat(), + token_owner=token_owner.name, + ) + self.token = pendings.add(pendable, timedelta(days=3650)) + + +@public +@abstract_component +class PGPSubscriptionBase(SubscriptionBase, PGPMixin): + def __init__(self, mlist, subscriber=None, *, pgp_address=None): + SubscriptionBase.__init__(self, mlist, subscriber) + PGPMixin.__init__(self, mlist, pgp_address=pgp_address) + + def _step_restore_subscriber(self): + self._restore_subscriber() diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py index 290e504..1d07903 100644 --- a/src/mailman_pgp/workflows/key_change.py +++ b/src/mailman_pgp/workflows/key_change.py @@ -28,10 +28,11 @@ from zope.interface import implementer from mailman_pgp.config import config from mailman_pgp.database import transaction -from mailman_pgp.model.address import PGPAddress -from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.utils.email import copy_headers +from mailman_pgp.workflows.base import PGPMixin +from mailman_pgp.workflows.mod_approval import ( + ModeratorKeyChangeApprovalMixin) CHANGE_CONFIRM_REQUEST = """\ ---------- @@ -46,34 +47,18 @@ Token: {} """ -@public -@implementer(IWorkflow) -class KeyChangeWorkflow(Workflow): - name = 'pgp-key-change-workflow' - description = '' - initial_state = 'change_key' +class KeyChangeBase(Workflow, PGPMixin): save_attributes = ( 'address_key', - 'pubkey_key' + 'pubkey_key', ) def __init__(self, mlist, pgp_address=None, pubkey=None): - super().__init__() - self.mlist = mlist - self.pgp_list = PGPMailingList.for_list(mlist) - self.pgp_address = pgp_address + Workflow.__init__(self) + PGPMixin.__init__(self, mlist, pgp_address) self.pubkey = pubkey @property - def address_key(self): - return self.pgp_address.email - - @address_key.setter - def address_key(self, value): - self.pgp_address = PGPAddress.for_email(value) - self.member = self.mlist.regular_members.get_member(value) - - @property def pubkey_key(self): return str(self.pubkey) @@ -81,23 +66,27 @@ class KeyChangeWorkflow(Workflow): def pubkey_key(self, value): self.pubkey, _ = PGPKey.from_blob(value) - def _step_change_key(self): - if self.pgp_address is None or self.pubkey is None: - raise ValueError - - self.push('send_key_confirm_request') - - def _step_send_key_confirm_request(self): + def _pend(self, token_owner, lifetime=None): pendings = getUtility(IPendings) - pendable = KeyChangeWorkflow.pendable_class()( + pendable = self.pendable_class()( email=self.pgp_address.email, pubkey=str(self.pubkey), fingerprint=self.pubkey.fingerprint ) - lifetime = config.get_value('misc', 'change_request_lifetime') + self.token = pendings.add(pendable, lifetime=lifetime) - self.token_owner = TokenOwner.subscriber + self.token_owner = token_owner + + def _step_change_key(self): + if self.pgp_address is None or self.pubkey is None: + raise ValueError + + self.push('send_key_confirm_request') + def _step_send_key_confirm_request(self): + self._pend(TokenOwner.subscriber, + lifetime=config.get_value('misc', + 'change_request_lifetime')) self.push('receive_confirmation') self.save() request_address = self.mlist.request_address @@ -116,20 +105,52 @@ class KeyChangeWorkflow(Workflow): raise StopIteration def _step_receive_confirmation(self): + self._set_token(TokenOwner.no_one) + + def _step_do_change(self): with transaction(): self.pgp_address.key = self.pubkey self.pgp_address.key_confirmed = True - pendings = getUtility(IPendings) - if self.token is not None: - pendings.confirm(self.token) - self.token = None - self.token_owner = TokenOwner.no_one - @classmethod def pendable_class(cls): @implementer(IPendable) class Pendable(dict): - PEND_TYPE = KeyChangeWorkflow.name + PEND_TYPE = cls.name return Pendable + + +@public +@implementer(IWorkflow) +class KeyChangeWorkflow(KeyChangeBase): + name = 'pgp-key-change-workflow' + description = '' + initial_state = 'prepare' + + def _step_prepare(self): + self.push('do_change') + self.push('change_key') + + +@public +@implementer(IWorkflow) +class KeyChangeModWorkflow(KeyChangeBase, ModeratorKeyChangeApprovalMixin): + name = 'pgp-key-change-mod-workflow' + description = '' + initial_state = 'prepare' + save_attributes = ( + 'approved', + 'address_key', + 'pubkey_key' + ) + + def __init__(self, mlist, pgp_address=None, pubkey=None, + pre_approved=False): + KeyChangeBase.__init__(self, mlist, pgp_address, pubkey) + ModeratorKeyChangeApprovalMixin.__init__(self, pre_approved) + + def _step_prepare(self): + self.push('do_change') + self.push('mod_approval') + self.push('change_key') diff --git a/src/mailman_pgp/workflows/key_confirm.py b/src/mailman_pgp/workflows/key_confirm.py new file mode 100644 index 0000000..0a38551 --- /dev/null +++ b/src/mailman_pgp/workflows/key_confirm.py @@ -0,0 +1,77 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +from mailman.email.message import UserNotification +from mailman.interfaces.subscriptions import TokenOwner +from public import public + +from mailman_pgp.database import transaction +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.utils.email import copy_headers + +CONFIRM_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +Reply to this message with this whole text +signed with your supplied key, either inline or PGP/MIME. + +Fingerprint: {} +Token: {} +---------- +""" + + +@public +class ConfirmPubkeyMixin: + def __init__(self, pre_confirmed=False): + self.pubkey_confirmed = pre_confirmed + + def _step_pubkey_confirmation(self): + assert self.pgp_address is not None + + if self.pubkey_confirmed: + with transaction(): + self.pgp_address.key_confirmed = True + else: + if not self.pgp_address.key_confirmed: + self.push('send_key_confirm_request') + + def _step_send_key_confirm_request(self): + self._set_token(TokenOwner.subscriber) + self.push('receive_key_confirmation') + self.save() + + request_address = self.mlist.request_address + email_address = self.address.email + msg = UserNotification(email_address, request_address, + 'key confirm {}'.format(self.token), + CONFIRM_REQUEST.format( + self.pgp_address.key_fingerprint, + self.token)) + pgp_list = PGPMailingList.for_list(self.mlist) + wrapped = PGPWrapper(msg) + encrypted = wrapped.sign_encrypt(pgp_list.key, self.pgp_address.key) + + msg.set_payload(encrypted.get_payload()) + copy_headers(encrypted, msg, True) + msg.send(self.mlist) + raise StopIteration + + def _step_receive_key_confirmation(self): + self._set_token(TokenOwner.no_one) diff --git a/src/mailman_pgp/workflows/key_revoke.py b/src/mailman_pgp/workflows/key_revoke.py new file mode 100644 index 0000000..7a7c071 --- /dev/null +++ b/src/mailman_pgp/workflows/key_revoke.py @@ -0,0 +1,70 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +from mailman.interfaces.pending import IPendable +from mailman.interfaces.workflows import IWorkflow +from mailman.workflows.base import Workflow +from public import public +from zope.interface import implementer + +from mailman_pgp.workflows.base import PGPMixin +from mailman_pgp.workflows.key_confirm import ConfirmPubkeyMixin +from mailman_pgp.workflows.key_set import SetPubkeyMixin +from mailman_pgp.workflows.mod_approval import ModeratorKeyRevokeApprovalMixin + + +class KeyRevokeBase(Workflow, PGPMixin): + def __init__(self, mlist, pgp_address=None): + Workflow.__init__(self) + PGPMixin.__init__(self, mlist, pgp_address) + + @classmethod + def pendable_class(cls): + @implementer(IPendable) + class Pendable(dict): + PEND_TYPE = cls.name + + return Pendable + + +@public +@implementer(IWorkflow) +class KeyRevokeWorkflow(KeyRevokeBase, SetPubkeyMixin, ConfirmPubkeyMixin, + ModeratorKeyRevokeApprovalMixin): + name = 'pgp-key-revoke-workflow' + description = '' + initial_state = 'prepare' + save_attributes = ( + 'approved', + 'address_key', + 'pubkey_key', + 'pubkey_confirmed' + ) + + def __init__(self, mlist, pgp_address=None, pubkey=None, + pubkey_pre_confirmed=False, pre_approved=False): + KeyRevokeBase.__init__(self, mlist, pgp_address=pgp_address) + SetPubkeyMixin.__init__(self, pubkey=pubkey) + ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) + ModeratorKeyRevokeApprovalMixin.__init__(self, + pre_approved=pre_approved) + + def _step_prepare(self): + self.push('mod_approval') + self.push('pubkey_confirmation') + self.push('pubkey_checks') diff --git a/src/mailman_pgp/workflows/key_set.py b/src/mailman_pgp/workflows/key_set.py new file mode 100644 index 0000000..95e6e7c --- /dev/null +++ b/src/mailman_pgp/workflows/key_set.py @@ -0,0 +1,76 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +from mailman.email.message import UserNotification +from mailman.interfaces.subscriptions import TokenOwner +from pgpy import PGPKey +from public import public + +from mailman_pgp.database import transaction + +KEY_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +We need your pubkey. +Reply to this message with it as a PGP/MIME(preferred) or inline. +----------""" + + +@public +class SetPubkeyMixin: + def __init__(self, pubkey=None): + self.pubkey = pubkey + + @property + def pubkey_key(self): + if self.pubkey is None: + return None + return str(self.pubkey) + + @pubkey_key.setter + def pubkey_key(self, value): + if value is not None: + self.pubkey, _ = PGPKey.from_blob(value) + else: + self.pubkey = None + + def _step_pubkey_checks(self): + assert self.pgp_address is not None + + if self.pubkey is None: + if self.pgp_address.key is None: + self.push('send_key_request') + else: + with transaction(): + self.pgp_address.key = self.pubkey + + def _step_send_key_request(self): + self._set_token(TokenOwner.subscriber) + self.push('receive_key') + self.save() + request_address = self.mlist.request_address + email_address = self.address.email + msg = UserNotification(email_address, request_address, + 'key set {}'.format(self.token), + KEY_REQUEST) + msg.send(self.mlist, add_precedence=False) + # Now we wait for the confirmation. + raise StopIteration + + def _step_receive_key(self): + self._set_token(TokenOwner.no_one) diff --git a/src/mailman_pgp/workflows/mod_approval.py b/src/mailman_pgp/workflows/mod_approval.py new file mode 100644 index 0000000..367f773 --- /dev/null +++ b/src/mailman_pgp/workflows/mod_approval.py @@ -0,0 +1,153 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +import copy + +from mailman.email.message import UserNotification +from mailman.interfaces.pending import IPendings +from mailman.interfaces.subscriptions import TokenOwner +from public import public +from zope.component import getUtility + +from mailman_pgp.pgp.mime import MIMEWrapper +from mailman_pgp.utils.email import overwrite_message + +SUBSCRIPTION_MOD_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +A user with address {address} requested subscription. +The key is attached to this message. + +Fingerprint: {fingerprint} +---------- +""" + +KEY_CHANGE_MOD_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +A subscriber with address {address} requested a change of his key. +The new key is attached to this message. + +Old key fingerprint: {old_fpr} +New key fingerprint: {new_fpr} +---------- +""" + +KEY_REVOKE_MOD_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +A subscriber with address {address} revoked a part of his key, +which made it unusable and needs to be reset. The subscriber +supplied a new key. The new key is attached to this message. + +Old key fingerprint: {old_fpr} +New key fingerprint: {new_fpr} +---------- +""" + + +class ModeratorApprovalMixin: + def __init__(self, pre_approved=False): + self.approved = pre_approved + + def _step_mod_approval(self): + if not self.approved: + self.push('get_approval') + + def _step_get_approval(self): + self._pend(TokenOwner.moderator) + self.push('receive_mod_confirmation') + self.save() + + name = self._request_name + body = self._request_body + + if self.mlist.admin_immed_notify: + subject = 'New {} request from {}'.format(name, + self.pgp_address.email) + msg = UserNotification( + self.mlist.owner_address, self.mlist.owner_address, + subject, body, self.mlist.preferred_language) + out = copy.deepcopy(msg) + wrapped = MIMEWrapper(msg) + msg = wrapped.attach_keys(self.pubkey) + overwrite_message(msg, out) + out.send(self.mlist) + raise StopIteration + + def _step_receive_mod_confirmation(self): + pendings = getUtility(IPendings) + if self.token is not None: + pendings.confirm(self.token) + self.token = None + self.token_owner = TokenOwner.no_one + + +@public +class ModeratorSubApprovalMixin(ModeratorApprovalMixin): + def __init__(self, pre_approved=False): + super().__init__(pre_approved) + + @property + def _request_name(self): + return 'subscription' + + @property + def _request_body(self): + params = {'mlist': self.mlist.fqdn_listname, + 'address': self.pgp_address.email, + 'fingerprint': self.pubkey.fingerprint} + return SUBSCRIPTION_MOD_REQUEST.format(**params) + + +@public +class ModeratorKeyChangeApprovalMixin(ModeratorApprovalMixin): + def __init__(self, pre_approved=False): + super().__init__(pre_approved) + + @property + def _request_name(self): + return 'key change' + + @property + def _request_body(self): + params = {'mlist': self.mlist.fqdn_listname, + 'address': self.pgp_address.email, + 'fingerprint': self.pubkey.fingerprint, + 'old_fpr': self.pgp_address.key_fingerprint, + 'new_fpr': self.pubkey.fingerprint} + return KEY_CHANGE_MOD_REQUEST.format(**params) + + +@public +class ModeratorKeyRevokeApprovalMixin(ModeratorApprovalMixin): + def __init__(self, pre_approved=False): + super().__init__(pre_approved) + + @property + def _request_name(self): + return 'key reset' + + @property + def _request_body(self): + params = {'mlist': self.mlist.fqdn_listname, + 'address': self.pgp_address.email, + 'fingerprint': self.pubkey.fingerprint, + 'old_fpr': self.pgp_address.key_fingerprint, + 'new_fpr': self.pubkey.fingerprint} + return KEY_REVOKE_MOD_REQUEST.format(**params) diff --git a/src/mailman_pgp/workflows/pubkey.py b/src/mailman_pgp/workflows/pubkey.py deleted file mode 100644 index a13d491..0000000 --- a/src/mailman_pgp/workflows/pubkey.py +++ /dev/null @@ -1,118 +0,0 @@ -from mailman.email.message import UserNotification -from mailman.interfaces.subscriptions import TokenOwner -from pgpy import PGPKey - -from mailman_pgp.database import transaction -from mailman_pgp.model.address import PGPAddress -from mailman_pgp.model.list import PGPMailingList -from mailman_pgp.pgp.wrapper import PGPWrapper -from mailman_pgp.utils.email import copy_headers - -KEY_REQUEST = """\ ----------- -TODO: this is a pgp enabled list. -We need your pubkey. -Reply to this message with it as a PGP/MIME(preferred) or inline. -----------""" - -CONFIRM_REQUEST = """\ ----------- -TODO: this is a pgp enabled list. -Reply to this message with this whole text -signed with your supplied key, either inline or PGP/MIME. - -Fingerprint: {} -Token: {} ----------- -""" - - -class SetPubkeyMixin: - def __init__(self, pubkey=None): - self.pubkey = pubkey - - @property - def pubkey_key(self): - if self.pubkey is None: - return None - return str(self.pubkey) - - @pubkey_key.setter - def pubkey_key(self, value): - if value is not None: - self.pubkey, _ = PGPKey.from_blob(value) - else: - self.pubkey = None - - def _step_pubkey_checks(self): - pgp_address = PGPAddress.for_address(self.address) - assert pgp_address is not None - - if self.pubkey is None: - if pgp_address.key is None: - self.push('send_key_request') - else: - with transaction(): - pgp_address.key = self.pubkey - - def _step_send_key_request(self): - self._set_token(TokenOwner.subscriber) - self.push('receive_key') - self.save() - request_address = self.mlist.request_address - email_address = self.address.email - msg = UserNotification(email_address, request_address, - 'key set {}'.format(self.token), - KEY_REQUEST) - msg.send(self.mlist, add_precedence=False) - # Now we wait for the confirmation. - raise StopIteration - - def _step_receive_key(self): - self._restore_subscriber() - self._set_token(TokenOwner.no_one) - - -class ConfirmPubkeyMixin: - def __init__(self, pre_confirmed=False): - self.pubkey_confirmed = pre_confirmed - - def _step_pubkey_confirmation(self): - pgp_address = PGPAddress.for_address(self.address) - assert pgp_address is not None - - if self.pubkey_confirmed: - with transaction(): - pgp_address.key_confirmed = True - else: - if not pgp_address.key_confirmed: - self.push('send_key_confirm_request') - - def _step_send_key_confirm_request(self): - self._set_token(TokenOwner.subscriber) - self.push('receive_key_confirmation') - self.save() - - pgp_address = PGPAddress.for_address(self.address) - request_address = self.mlist.request_address - email_address = self.address.email - msg = UserNotification(email_address, request_address, - 'key confirm {}'.format(self.token), - CONFIRM_REQUEST.format( - pgp_address.key_fingerprint, - self.token)) - pgp_list = PGPMailingList.for_list(self.mlist) - wrapped = PGPWrapper(msg) - encrypted = wrapped.sign_encrypt(pgp_list.key, pgp_address.key) - - msg.set_payload(encrypted.get_payload()) - copy_headers(encrypted, msg, True) - msg.send(self.mlist) - raise StopIteration - - def _step_receive_key_confirmation(self): - self._restore_subscriber() - self._set_token(TokenOwner.no_one) - with transaction(): - pgp_address = PGPAddress.for_address(self.address) - pgp_address.key_confirmed = True diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py index 809b7cb..2546f1e 100644 --- a/src/mailman_pgp/workflows/subscription.py +++ b/src/mailman_pgp/workflows/subscription.py @@ -19,18 +19,19 @@ from mailman.core.i18n import _ from mailman.interfaces.workflows import ISubscriptionWorkflow -from mailman.workflows.common import (ConfirmationMixin, ModerationMixin, - SubscriptionBase, VerificationMixin) +from mailman.workflows.common import (ConfirmationMixin, VerificationMixin) from public import public from zope.interface import implementer -from mailman_pgp.workflows.base import PGPMixin -from mailman_pgp.workflows.pubkey import ConfirmPubkeyMixin, SetPubkeyMixin +from mailman_pgp.workflows.base import PGPMixin, PGPSubscriptionBase +from mailman_pgp.workflows.key_confirm import ConfirmPubkeyMixin +from mailman_pgp.workflows.key_set import SetPubkeyMixin +from mailman_pgp.workflows.mod_approval import ModeratorSubApprovalMixin @public @implementer(ISubscriptionWorkflow) -class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, +class OpenSubscriptionPolicy(PGPSubscriptionBase, VerificationMixin, SetPubkeyMixin, ConfirmPubkeyMixin, PGPMixin): """""" @@ -52,26 +53,28 @@ class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, def __init__(self, mlist, subscriber=None, *, pre_verified=False, pubkey=None, pubkey_pre_confirmed=False): - SubscriptionBase.__init__(self, mlist, subscriber) + PGPSubscriptionBase.__init__(self, mlist, subscriber) VerificationMixin.__init__(self, pre_verified=pre_verified) SetPubkeyMixin.__init__(self, pubkey=pubkey) ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) - PGPMixin.__init__(self) + PGPMixin.__init__(self, mlist) def _step_prepare(self): self.push('do_subscription') + self.push('restore_subscriber') self.push('pubkey_confirmation') + self.push('restore_address') self.push('pubkey_checks') - self.push('pgp_prepare') + self.push('create_address') self.push('verification_checks') self.push('sanity_checks') @public @implementer(ISubscriptionWorkflow) -class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, +class ConfirmSubscriptionPolicy(PGPSubscriptionBase, VerificationMixin, ConfirmationMixin, SetPubkeyMixin, - ConfirmPubkeyMixin, PGPMixin): + ConfirmPubkeyMixin): """""" name = 'pgp-policy-confirm' @@ -92,18 +95,19 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, def __init__(self, mlist, subscriber=None, *, pre_verified=False, pre_confirmed=False, pubkey=None, pubkey_pre_confirmed=False): - SubscriptionBase.__init__(self, mlist, subscriber) + PGPSubscriptionBase.__init__(self, mlist, subscriber) VerificationMixin.__init__(self, pre_verified=pre_verified) ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed) SetPubkeyMixin.__init__(self, pubkey=pubkey) ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) - PGPMixin.__init__(self) def _step_prepare(self): self.push('do_subscription') + self.push('restore_subscriber') self.push('pubkey_confirmation') + self.push('restore_address') self.push('pubkey_checks') - self.push('pgp_prepare') + self.push('create_address') self.push('confirmation_checks') self.push('verification_checks') self.push('sanity_checks') @@ -111,9 +115,9 @@ class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, @public @implementer(ISubscriptionWorkflow) -class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, - ModerationMixin, SetPubkeyMixin, - ConfirmPubkeyMixin, PGPMixin): +class ModerationSubscriptionPolicy(PGPSubscriptionBase, VerificationMixin, + ModeratorSubApprovalMixin, SetPubkeyMixin, + ConfirmPubkeyMixin): """""" name = 'pgp-policy-moderate' @@ -134,29 +138,31 @@ class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, def __init__(self, mlist, subscriber=None, *, pre_verified=False, pre_approved=False, pubkey=None, pubkey_pre_confirmed=False): - SubscriptionBase.__init__(self, mlist, subscriber) + PGPSubscriptionBase.__init__(self, mlist, subscriber) VerificationMixin.__init__(self, pre_verified=pre_verified) - ModerationMixin.__init__(self, pre_approved=pre_approved) + ModeratorSubApprovalMixin.__init__(self, pre_approved=pre_approved) SetPubkeyMixin.__init__(self, pubkey=pubkey) ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) - PGPMixin.__init__(self) def _step_prepare(self): self.push('do_subscription') - self.push('moderation_checks') + self.push('restore_subscriber') + self.push('mod_approval') self.push('pubkey_confirmation') + self.push('restore_address') self.push('pubkey_checks') - self.push('pgp_prepare') + self.push('create_address') self.push('verification_checks') self.push('sanity_checks') @public @implementer(ISubscriptionWorkflow) -class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, - ConfirmationMixin, ModerationMixin, - SetPubkeyMixin, ConfirmPubkeyMixin, - PGPMixin): +class ConfirmModerationSubscriptionPolicy(PGPSubscriptionBase, + VerificationMixin, + ConfirmationMixin, + ModeratorSubApprovalMixin, + SetPubkeyMixin, ConfirmPubkeyMixin): """""" name = 'pgp-policy-confirm-moderate' @@ -178,20 +184,22 @@ class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, def __init__(self, mlist, subscriber=None, *, pre_verified=False, pre_confirmed=False, pre_approved=False, pubkey=None, pubkey_pre_confirmed=False): - SubscriptionBase.__init__(self, mlist, subscriber) + PGPSubscriptionBase.__init__(self, mlist, subscriber) VerificationMixin.__init__(self, pre_verified=pre_verified) ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed) - ModerationMixin.__init__(self, pre_approved=pre_approved) + ModeratorSubApprovalMixin.__init__(self, pre_approved=pre_approved) SetPubkeyMixin.__init__(self, pubkey=pubkey) ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) - PGPMixin.__init__(self) def _step_prepare(self): self.push('do_subscription') - self.push('moderation_checks') + self.push('restore_subscriber') + self.push('mod_approval') + self.push('restore_address') self.push('pubkey_confirmation') + self.push('restore_address') self.push('pubkey_checks') - self.push('pgp_prepare') + self.push('create_address') self.push('confirmation_checks') self.push('verification_checks') self.push('sanity_checks') diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py index 31b3d05..3904105 100644 --- a/src/mailman_pgp/workflows/tests/test_base.py +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -37,8 +37,8 @@ from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.testing.layers import PGPConfigLayer from mailman_pgp.testing.pgp import load_key from mailman_pgp.workflows.base import (PGPMixin) -from mailman_pgp.workflows.pubkey import (ConfirmPubkeyMixin, KEY_REQUEST, - SetPubkeyMixin) +from mailman_pgp.workflows.key_confirm import ConfirmPubkeyMixin +from mailman_pgp.workflows.key_set import KEY_REQUEST, SetPubkeyMixin class PubkeyMixinTestSetup(): @@ -49,13 +49,14 @@ class PubkeyMixinTestSetup(): self.list_key = load_key('ecc_p256.priv.asc') - self.pgp_list = PGPMailingList.for_list(self.mlist) - self.pgp_list.key = self.list_key + with transaction(): + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = self.list_key self.um = getUtility(IUserManager) self.sender_key = load_key('rsa_1024.priv.asc') - self.sender = self.um.create_address('rsa-1024b@example.org') + self.sender = self.um.create_address('anne@example.org') @implementer(IWorkflow) @@ -78,13 +79,13 @@ class PGPTestWorkflow(SubscriptionBase, PGPMixin, SetPubkeyMixin, SubscriptionBase.__init__(self, mlist, subscriber) SetPubkeyMixin.__init__(self, pubkey=pubkey) ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) - PGPMixin.__init__(self) + PGPMixin.__init__(self, mlist) def _step_prepare(self): self.push('do_subscription') self.push('pubkey_confirmation') self.push('pubkey_checks') - self.push('pgp_prepare') + self.push('create_address') self.push('sanity_checks') @@ -93,7 +94,7 @@ class TestPGPMixin(PubkeyMixinTestSetup, unittest.TestCase): def test_create_address(self): workflow = PGPTestWorkflow(self.mlist, self.sender) - workflow.run_thru('pgp_prepare') + workflow.run_thru('create_address') pgp_address = PGPAddress.for_address(self.sender) self.assertIsNotNone(pgp_address) @@ -102,7 +103,7 @@ class TestPGPMixin(PubkeyMixinTestSetup, unittest.TestCase): with transaction() as t: pgp_address = PGPAddress(self.sender) t.add(pgp_address) - workflow.run_thru('pgp_prepare') + workflow.run_thru('create_address') still = PGPAddress.for_address(self.sender) self.assertIsNotNone(still) @@ -203,7 +204,7 @@ class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase): self.assertIsNotNone(workflow.token) pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) self.assertEqual(pendable['list_id'], 'test.example.com') - self.assertEqual(pendable['email'], 'rsa-1024b@example.org') + self.assertEqual(pendable['email'], 'anne@example.org') self.assertEqual(pendable['display_name'], '') self.assertEqual(pendable['when'], '2005-08-01T07:49:23') self.assertEqual(pendable['token_owner'], 'subscriber') @@ -216,7 +217,7 @@ class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase): self.assertIsNotNone(workflow.token) pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) self.assertEqual(pendable['list_id'], 'test.example.com') - self.assertEqual(pendable['email'], 'rsa-1024b@example.org') + self.assertEqual(pendable['email'], 'anne@example.org') self.assertEqual(pendable['display_name'], '') self.assertEqual(pendable['when'], '2005-08-01T07:49:23') self.assertEqual(pendable['token_owner'], 'subscriber') diff --git a/src/mailman_pgp/workflows/tests/test_key_change.py b/src/mailman_pgp/workflows/tests/test_key_change.py index e469d51..5d4926a 100644 --- a/src/mailman_pgp/workflows/tests/test_key_change.py +++ b/src/mailman_pgp/workflows/tests/test_key_change.py @@ -25,13 +25,15 @@ from mailman.interfaces.usermanager import IUserManager from mailman.testing.helpers import get_queue_messages from zope.component import getUtility +from mailman_pgp.config import mm_config from mailman_pgp.database import mm_transaction, transaction from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.testing.layers import PGPConfigLayer from mailman_pgp.testing.pgp import load_key -from mailman_pgp.workflows.key_change import KeyChangeWorkflow +from mailman_pgp.workflows.key_change import (KeyChangeModWorkflow, + KeyChangeWorkflow) class TestKeyChangeWorkflow(unittest.TestCase): @@ -41,13 +43,18 @@ class TestKeyChangeWorkflow(unittest.TestCase): with mm_transaction(): self.mlist = create_list('test@example.com', style_name='pgp-default') - self.pgp_list = PGPMailingList.for_list(self.mlist) - self.pgp_list.key = load_key('ecc_p256.priv.asc') + with transaction(): + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = load_key('ecc_p256.priv.asc') self.sender_key = load_key('rsa_1024.priv.asc') self.sender_new_key = load_key('ecc_p256.priv.asc') self.sender = getUtility(IUserManager).create_address( - 'rsa-1024b@example.org') + 'anne@example.org') + + def test_has_workflows(self): + self.assertTrue(KeyChangeWorkflow.name, mm_config.workflows) + self.assertTrue(KeyChangeModWorkflow.name, mm_config.workflows) def test_pgp_address_none(self): workflow = KeyChangeWorkflow(self.mlist) @@ -101,3 +108,19 @@ class TestKeyChangeWorkflow(unittest.TestCase): self.assertEqual(pgp_address.key_fingerprint, self.sender_new_key.fingerprint) self.assertTrue(pgp_address.key_confirmed) + + def test_confirm_mod(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow = KeyChangeModWorkflow(self.mlist, pgp_address, + self.sender_new_key.pubkey) + list(workflow) + + token, token_owner, member = ISubscriptionManager(self.mlist).confirm( + workflow.token) + self.assertIsNotNone(token) + self.assertEqual(token_owner, TokenOwner.moderator) diff --git a/src/mailman_pgp/workflows/tests/test_mod_approval.py b/src/mailman_pgp/workflows/tests/test_mod_approval.py new file mode 100644 index 0000000..7d57a9b --- /dev/null +++ b/src/mailman_pgp/workflows/tests/test_mod_approval.py @@ -0,0 +1,106 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +from unittest import TestCase + +from mailman.app.lifecycle import create_list +from mailman.interfaces.subscriptions import TokenOwner +from mailman.interfaces.usermanager import IUserManager +from mailman.interfaces.workflows import IWorkflow +from mailman.testing.helpers import get_queue_messages +from zope.component import getUtility +from zope.interface import implementer + +from mailman_pgp.database import mm_transaction, transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.testing.pgp import load_key +from mailman_pgp.workflows.key_change import KeyChangeBase +from mailman_pgp.workflows.mod_approval import ( + ModeratorKeyChangeApprovalMixin) + + +@implementer(IWorkflow) +class PGPTestWorkflow(KeyChangeBase, ModeratorKeyChangeApprovalMixin): + name = 'test-workflow' + description = '' + initial_state = 'mod_approval' + save_attributes = ( + 'approved', + ) + + def __init__(self, mlist, pgp_address=None, pubkey=None, + pre_approved=False): + KeyChangeBase.__init__(self, mlist, pgp_address, pubkey) + ModeratorKeyChangeApprovalMixin.__init__(self, pre_approved) + + +class TestModeratorApprovalMixin(TestCase): + layer = PGPConfigLayer + + def setUp(self): + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + with transaction(): + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = load_key('ecc_p256.priv.asc') + + self.sender_key = load_key('rsa_1024.priv.asc') + self.sender_new_key = load_key('ecc_p256.priv.asc') + self.sender = getUtility(IUserManager).create_address( + 'anne@example.org') + + def test_get_approval(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow = PGPTestWorkflow(self.mlist, pgp_address, + self.sender_new_key.pubkey) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + + self.assertEqual(message['Subject'], + 'New key change request from {}'.format( + pgp_address.email)) + wrapped = PGPWrapper(message) + self.assertTrue(wrapped.has_keys()) + keys = list(wrapped.keys()) + self.assertEqual(len(keys), 1) + key = keys.pop() + self.assertEqual(key.fingerprint, self.sender_new_key.fingerprint) + + def test_receive_approval(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow = PGPTestWorkflow(self.mlist, pgp_address, + self.sender_new_key.pubkey) + list(workflow) + get_queue_messages('virgin', expected_count=1) + list(workflow) + self.assertEqual(workflow.token_owner, TokenOwner.no_one) |
