diff options
Diffstat (limited to 'src/mailman_pgp/commands')
| -rw-r--r-- | src/mailman_pgp/commands/eml_key.py | 128 | ||||
| -rw-r--r-- | src/mailman_pgp/commands/tests/test_key.py | 231 |
2 files changed, 313 insertions, 46 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index 0775bed..5799b8f 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -17,6 +17,7 @@ """The key email command.""" import logging +import copy from email.mime.text import MIMEText from mailman.email.message import UserNotification @@ -25,19 +26,24 @@ from mailman.interfaces.pending import IPendings from mailman.interfaces.subscriptions import ISubscriptionManager from mailman.interfaces.usermanager import IUserManager from pgpy.constants import KeyFlags +from pgpy.errors import PGPError from public import public from zope.component import getUtility from zope.interface import implementer +from mailman_pgp.config import mm_config from mailman_pgp.database import transaction from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.mime import MIMEWrapper from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.utils.email import get_email +from mailman_pgp.utils.pgp import key_usable from mailman_pgp.workflows.key_change import (CHANGE_CONFIRM_REQUEST, + KeyChangeModWorkflow, KeyChangeWorkflow) -from mailman_pgp.workflows.pubkey import CONFIRM_REQUEST +from mailman_pgp.workflows.key_confirm import CONFIRM_REQUEST +from mailman_pgp.workflows.key_revoke import KeyRevokeWorkflow log = logging.getLogger('mailman.plugin.pgp.commands') @@ -78,10 +84,7 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results): file=results) return ContinueProcessing.no - usage_flags = key.usage_flags() - for subkey in key.subkeys.values(): - usage_flags |= subkey.usage_flags() - if KeyFlags.EncryptCommunications not in usage_flags: + if not key_usable(key, {KeyFlags.EncryptCommunications, KeyFlags.Sign}): print('Need a key which can be used to encrypt communications.', file=results) return ContinueProcessing.no @@ -170,7 +173,8 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): print('Wrong token.', file=results) return ContinueProcessing.no - if pendable.get('type') == KeyChangeWorkflow.pendable_class().PEND_TYPE: + if pendable.get('type') in (KeyChangeWorkflow.name, + KeyChangeModWorkflow.name): expecting = CHANGE_CONFIRM_REQUEST.format(pendable.get('fingerprint'), token) else: @@ -178,6 +182,8 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): for sig_subject in wrapped.get_signed(): if expecting in sig_subject: + with transaction(): + pgp_address.key_confirmed = True ISubscriptionManager(mlist).confirm(token) break else: @@ -211,6 +217,14 @@ def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results): print('A pgp enabled address not found.', file=results) return ContinueProcessing.no + if pgp_address.key is None: + print("You currently don't have a key set.", file=results) + return ContinueProcessing.no + + if not pgp_address.key_confirmed: + print('Your key is currently not confirmed.', file=results) + return ContinueProcessing.no + wrapped = PGPWrapper(msg) if wrapped.is_encrypted(): decrypted = wrapped.try_decrypt(pgp_list.key) @@ -231,24 +245,108 @@ def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results): file=results) return ContinueProcessing.no - usage_flags = key.usage_flags() - for subkey in key.subkeys.values(): - usage_flags |= subkey.usage_flags() - if KeyFlags.EncryptCommunications not in usage_flags: + if not key_usable(key, {KeyFlags.EncryptCommunications, KeyFlags.Sign}): print('Need a key which can be used to encrypt communications.', file=results) return ContinueProcessing.no - workflow = KeyChangeWorkflow(mlist, pgp_address, key) + workflow_class = mm_config.workflows[pgp_list.key_change_workflow] + + workflow = workflow_class(mlist, pgp_address, key) list(workflow) print('Key change request received.', file=results) return ContinueProcessing.no def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results): - # Current key revocation certificate in attachment, restarts the - # subscription process, or rather only it's key setup part. - pass + """ + `key revoke` command. + + Used when a user has to revoke a part of a key used for a `PGPAddress`. + + * This command message CAN be encrypted to the list key, in which case it + will be decrypted. + * This command message MUST have at least one revocation certificate + attached. + """ + if len(arguments) != 1: + print('Extraneous argument/s: ' + ','.join(arguments[1:]), + file=results) + return ContinueProcessing.no + + email = get_email(msg) + if not email: + print('No email to revoke key of.', file=results) + return ContinueProcessing.no + + pgp_address = PGPAddress.for_email(email) + if pgp_address is None: + print('A pgp enabled address not found.', file=results) + return ContinueProcessing.no + + key = pgp_address.key + if key is None: + print("You currently don't have a key set.", file=results) + return ContinueProcessing.no + + if not pgp_address.key_confirmed: + print('Your key is currently not confirmed.', file=results) + return ContinueProcessing.no + + wrapped = PGPWrapper(msg) + if wrapped.is_encrypted(): + decrypted = wrapped.try_decrypt(pgp_list.key) + wrapped = PGPWrapper(decrypted) + + if not wrapped.has_revocs(): + print('No key revocations attached? Send a key revocation.', + file=results) + return ContinueProcessing.no + + key_copy = copy.copy(key) + + revocs = list(wrapped.revocs()) + matches = 0 + for revoc in revocs: + old_matches = matches + try: + verified = key_copy.verify(key_copy, revoc) + if verified: + key_copy |= revoc + matches += 1 + continue + except PGPError: + pass + + for subkey in key_copy.subkeys.values(): + try: + verified = key_copy.verify(subkey, revoc) + if verified: + subkey |= revoc + matches += 1 + break + except PGPError: + pass + # No match? + if matches == old_matches: + print('Revocation found for not-found key.', file=results) + + if not key_usable(key_copy, + {KeyFlags.EncryptCommunications, KeyFlags.Sign}): + # Start reset process. + with transaction(): + pgp_address.key = None + pgp_address.key_confirmed = False + workflow = KeyRevokeWorkflow(mlist, pgp_address) + list(workflow) + print('Key needs to be reset.', file=results) + else: + # Just update key. + if matches > 0: + with transaction(): + pgp_address.key = key_copy + print('Key succesfully updated.', file=results) + return ContinueProcessing.yes def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results): @@ -273,7 +371,7 @@ def _cmd_receive(pgp_list, mlist, msg, msgdata, arguments, results): msg['MIME-Version'] = '1.0' msg.attach(MIMEText('Here is the public key you requested.')) wrapped = MIMEWrapper(msg) - msg = wrapped.attach_key(pgp_list.pubkey) + msg = wrapped.attach_keys(pgp_list.pubkey) msg.send(mlist) return ContinueProcessing.yes diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index cfda3e8..1a128a1 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -27,13 +27,13 @@ from mailman.runners.command import CommandRunner from mailman.testing.helpers import get_queue_messages, make_testable_runner from mailman.utilities.datetime import now from pgpy import PGPKey, PGPUID -from pgpy.constants import (PubKeyAlgorithm, KeyFlags, EllipticCurveOID, - HashAlgorithm, SymmetricKeyAlgorithm, - CompressionAlgorithm) +from pgpy.constants import ( + CompressionAlgorithm, EllipticCurveOID, HashAlgorithm, KeyFlags, + PubKeyAlgorithm, SymmetricKeyAlgorithm) from zope.component import getUtility from mailman_pgp.config import mm_config -from mailman_pgp.database import transaction +from mailman_pgp.database import mm_transaction, transaction from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.mime import MIMEWrapper @@ -41,7 +41,7 @@ from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.testing.layers import PGPConfigLayer from mailman_pgp.testing.pgp import load_key from mailman_pgp.workflows.key_change import CHANGE_CONFIRM_REQUEST -from mailman_pgp.workflows.pubkey import CONFIRM_REQUEST +from mailman_pgp.workflows.key_confirm import CONFIRM_REQUEST from mailman_pgp.workflows.subscription import OpenSubscriptionPolicy @@ -128,8 +128,7 @@ class TestPreSubscription(unittest.TestCase): hashes=[HashAlgorithm.SHA256, HashAlgorithm.SHA512], ciphers=[SymmetricKeyAlgorithm.AES256], - compression=[CompressionAlgorithm.ZLIB, - CompressionAlgorithm.Uncompressed] + compression=[CompressionAlgorithm.ZLIB] ) def test_set(self): @@ -145,7 +144,7 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'Re: key set {}'.format(token)) wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -186,7 +185,7 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'Re: key set {}'.format(token)) wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) wrapped_set_message = MIMEWrapper(set_message) set_message = wrapped_set_message.encrypt(self.pgp_list.pubkey, self.bart_key.pubkey) @@ -246,9 +245,9 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'Re: key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.anne_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.anne_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -263,7 +262,7 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'Re: key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key) + set_message = wrapped_set_message.attach_keys(self.bart_key) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -278,7 +277,7 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'Re: key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.unusable_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.unusable_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -293,7 +292,7 @@ class TestPreSubscription(unittest.TestCase): def test_set_no_email(self): message = _create_mixed('', 'test@example.com', 'key set token') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.bart_key.pubkey) + message = wrapped_message.attach_keys(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -307,7 +306,7 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -326,7 +325,7 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -349,7 +348,7 @@ class TestPreSubscription(unittest.TestCase): set_message = _create_mixed('bart@example.com', 'test@example.com', 'key set token') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -594,9 +593,13 @@ class TestAfterSubscription(unittest.TestCase): layer = PGPConfigLayer def setUp(self): - self.mlist = create_list('test@example.com', style_name='pgp-default') - self.pgp_list = PGPMailingList.for_list(self.mlist) - self.pgp_list.key = load_key('ecc_p256.priv.asc') + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + with transaction(): + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = load_key('ecc_p256.priv.asc') + self.pgp_list.key_change_workflow = 'pgp-key-change-workflow' self.bart_key = load_key('rsa_1024.priv.asc') self.bart_new_key = load_key('ecc_p256.priv.asc') @@ -611,8 +614,7 @@ class TestAfterSubscription(unittest.TestCase): hashes=[HashAlgorithm.SHA256, HashAlgorithm.SHA512], ciphers=[SymmetricKeyAlgorithm.AES256], - compression=[CompressionAlgorithm.ZLIB, - CompressionAlgorithm.Uncompressed] + compression=[CompressionAlgorithm.ZLIB] ) def test_change(self): @@ -628,7 +630,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.bart_new_key.pubkey) + message = wrapped_message.attach_keys(self.bart_new_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -663,7 +665,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.bart_new_key.pubkey) + message = wrapped_message.attach_keys(self.bart_new_key.pubkey) wrapped_message = MIMEWrapper(message) message = wrapped_message.encrypt(self.pgp_list.pubkey) @@ -700,7 +702,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.bart_new_key.pubkey) + message = wrapped_message.attach_keys(self.bart_new_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -751,7 +753,7 @@ class TestAfterSubscription(unittest.TestCase): def test_change_no_email(self): message = _create_mixed('', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.bart_key.pubkey) + message = wrapped_message.attach_keys(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -765,7 +767,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.bart_key.pubkey) + message = wrapped_message.attach_keys(self.bart_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -810,9 +812,9 @@ class TestAfterSubscription(unittest.TestCase): 'key change') wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) wrapped_set_message = MIMEWrapper(set_message) - set_message = wrapped_set_message.attach_key(self.bart_new_key.pubkey) + set_message = wrapped_set_message.attach_keys(self.bart_new_key.pubkey) mm_config.switchboards['command'].enqueue(set_message, listid='test.example.com') @@ -835,7 +837,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.bart_key) + message = wrapped_message.attach_keys(self.bart_key) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -858,7 +860,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(self.unusable_key.pubkey) + message = wrapped_message.attach_keys(self.unusable_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -870,6 +872,173 @@ class TestAfterSubscription(unittest.TestCase): 'Need a key which can be used to encrypt communications.', results_msg.get_payload()) + def test_revoke_extra_arg(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key revoke extra arguments', '') + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Extraneous argument/s: extra,arguments', + results_msg.get_payload()) + + def test_revoke_no_email(self): + message = _create_mixed('', 'test@example.com', 'key revoke') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No email to revoke key of.', results_msg.get_payload()) + + def test_revoke_no_pgp_address(self): + message = _create_mixed('bart@example.com', 'test@example.com', + 'key revoke') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('A pgp enabled address not found.', + results_msg.get_payload()) + + def test_revoke_no_key_set(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + t.add(pgp_address) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key revoke') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn("You currently don't have a key set.", + results_msg.get_payload()) + + def test_revoke_key_not_confirmed(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + t.add(pgp_address) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key revoke') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Your key is currently not confirmed.', + results_msg.get_payload()) + + def test_revoke_no_revocs(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + message = _create_plain('bart@example.com', 'test@example.com', + 'key revoke', '') + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No key revocations attached? Send a key revocation.', + results_msg.get_payload()) + + def test_revoke_resets(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + revoc = self.bart_key.revoke(self.bart_key) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key revoke') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_revocs(revoc) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=2) + if (items[0].msg['Subject'] == + 'The results of your email commands'): # pragma: no cover + results_msg = items[0].msg + else: + results_msg = items[1].msg + #TODO: finish test + + self.assertIn('Key needs to be reset.', results_msg.get_payload()) + + def test_revoke_updates(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + + test_key = PGPKey.new(PubKeyAlgorithm.RSAEncryptOrSign, 1024) + uid = PGPUID.new('Some Name', email='anne@example.org') + test_key.add_uid(uid, + usage={KeyFlags.Certify, + KeyFlags.EncryptCommunications, + KeyFlags.Sign}, + hashes=[HashAlgorithm.SHA256, + HashAlgorithm.SHA512], + ciphers=[SymmetricKeyAlgorithm.AES256], + compression=[CompressionAlgorithm.ZLIB]) + sub = PGPKey.new(PubKeyAlgorithm.ECDH, EllipticCurveOID.SECP256K1) + test_key.add_subkey(sub, usage={KeyFlags.EncryptCommunications}) + + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = test_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + revoc = test_key.revoke(sub.pubkey) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key revoke') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_revocs(revoc) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Key succesfully updated.', results_msg.get_payload()) + sub = next(iter(pgp_address.key.subkeys.values())) + revocs = list(sub.revocation_signatures) + self.assertEqual(len(revocs), 1) + self.assertEqual(revoc.hash2, revocs[0].hash2) + class TestGeneral(unittest.TestCase): layer = PGPConfigLayer |
