diff options
| author | J08nY | 2017-07-13 23:57:18 +0200 |
|---|---|---|
| committer | J08nY | 2017-07-13 23:57:18 +0200 |
| commit | 57f8d97c696913beeba8467aa550804422336d9c (patch) | |
| tree | bc537e0bf6827e12203d53d7873bd4aa7f7b9d27 /src/mailman_pgp/commands | |
| parent | 08389caf276e1b866cae2f6afc1d47b9c1876af5 (diff) | |
| download | mailman-pgp-57f8d97c696913beeba8467aa550804422336d9c.tar.gz mailman-pgp-57f8d97c696913beeba8467aa550804422336d9c.tar.zst mailman-pgp-57f8d97c696913beeba8467aa550804422336d9c.zip | |
Diffstat (limited to 'src/mailman_pgp/commands')
| -rw-r--r-- | src/mailman_pgp/commands/eml_key.py | 46 | ||||
| -rw-r--r-- | src/mailman_pgp/commands/tests/test_key.py | 49 |
2 files changed, 62 insertions, 33 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index 9df6065..7b7782d 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -19,6 +19,7 @@ from email.utils import parseaddr from mailman.interfaces.command import ContinueProcessing, IEmailCommand +from mailman.interfaces.pending import IPendings from mailman.interfaces.subscriptions import ISubscriptionManager from mailman.interfaces.usermanager import IUserManager from public import public @@ -47,6 +48,10 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results): return ContinueProcessing.no wrapped = PGPWrapper(msg) + if wrapped.is_encrypted(): + decrypted = wrapped.decrypt(pgp_list.key) + wrapped = PGPWrapper(decrypted) + if not wrapped.has_keys(): print('No keys attached? Send a key.', file=results) return ContinueProcessing.no @@ -66,21 +71,24 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results): print('No adddress to subscribe with.', file=results) return ContinueProcessing.no - with transaction() as t: - pgp_address = PGPAddress.for_address(address) - if pgp_address is None: - pgp_address = PGPAddress(address) - pgp_address.key = keys.pop() - t.add(pgp_address) + pgp_address = PGPAddress.for_address(address) + if pgp_address is None: + print('A pgp enabled address not found.', file=results) + return ContinueProcessing.no token = arguments[1] - try: - ISubscriptionManager(mlist).confirm(token) - print('Key succesfully set.', file=results) - print('Key fingerprint: {}'.format(pgp_address.key.fingerprint), - file=results) - except LookupError: + pendable = getUtility(IPendings).confirm(token, expunge=False) + if pendable is None: print('Wrong token.', file=results) + return ContinueProcessing.no + + with transaction(): + pgp_address.key = keys.pop() + ISubscriptionManager(mlist).confirm(token) + + print('Key succesfully set.', file=results) + print('Key fingerprint: {}'.format(pgp_address.key.fingerprint), + file=results) return ContinueProcessing.no @@ -115,13 +123,17 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): token = arguments[1] - expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint, - token) + pendable = getUtility(IPendings).confirm(token, expunge=False) + if pendable is None: + print('Wrong token.', file=results) + return ContinueProcessing.no + + # TODO differentiate between key change and subscription here. + + expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint, token) for sig_subject in wrapped.get_signed(): if expecting in sig_subject: - with transaction(): - pgp_address.key_confirmed = True - ISubscriptionManager(mlist).confirm(token) + ISubscriptionManager(mlist).confirm(token) break else: print("Message doesn't contain the expected statement.", file=results) diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index fe75a6a..64f8ae6 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -134,7 +134,6 @@ class TestPreSubscription(unittest.TestCase): pgp_address = PGPAddress.for_address(bart) self.assertIsNotNone(pgp_address) self.assertEqual(pgp_address.key.fingerprint, bart_key.fingerprint) - self.assertEqual(pgp_address.key_fingerprint, bart_key.fingerprint) self.assertFalse(pgp_address.key_confirmed) items = get_queue_messages('virgin', expected_count=2) @@ -242,11 +241,6 @@ class TestPreSubscription(unittest.TestCase): get_queue_messages('virgin') - with transaction() as t: - pgp_address = PGPAddress(bart) - pgp_address.key = bart_key.pubkey - t.add(pgp_address) - message = _create_plain('bart@example.com', 'test@example.com', 'Re: key confirm {}'.format(token), CONFIRM_REQUEST.format(bart_key.fingerprint, @@ -275,11 +269,6 @@ class TestPreSubscription(unittest.TestCase): get_queue_messages('virgin') - with transaction() as t: - pgp_address = PGPAddress(bart) - pgp_address.key = bart_key.pubkey - t.add(pgp_address) - message = _create_plain('bart@example.com', 'test@example.com', 'Re: key confirm {}'.format(token), CONFIRM_REQUEST.format(bart_key.fingerprint, @@ -290,6 +279,7 @@ class TestPreSubscription(unittest.TestCase): mm_config.switchboards['command'].enqueue(message, listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() pgp_address = PGPAddress.for_address(bart) @@ -346,11 +336,6 @@ class TestPreSubscription(unittest.TestCase): get_queue_messages('virgin') - with transaction() as t: - pgp_address = PGPAddress(bart) - pgp_address.key = bart_key.pubkey - t.add(pgp_address) - message = _create_plain('bart@example.com', 'test@example.com', 'Re: key confirm {}'.format(token), CONFIRM_REQUEST.format(bart_key.fingerprint, @@ -408,3 +393,35 @@ class TestAfterSubscription(unittest.TestCase): confirm_wrapped = PGPWrapper(confirm_request) self.assertTrue(confirm_wrapped.is_encrypted()) + decrypted = confirm_wrapped.decrypt(bart_new_key) + self.assertIn('key confirm', decrypted['subject']) + + def test_key_change_confirm(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart_key = load_key('rsa_1024.priv.asc') + bart_new_key = load_key('ecc_p256.priv.asc') + + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key change') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_key(bart_new_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=2) + if items[0].msg['Subject'] == 'The results of your email commands': + confirm_request = items[1].msg + else: + confirm_request = items[0].msg + request_wrapped = PGPWrapper(confirm_request) + request_wrapped.decrypt(bart_new_key) + # TODO finish this |
