diff options
| author | J08nY | 2017-07-14 15:13:47 +0200 |
|---|---|---|
| committer | J08nY | 2017-07-14 15:13:47 +0200 |
| commit | 9f0ac2239af18f780c757f8cf6524c99de2dffe8 (patch) | |
| tree | f1b7d906ce1ebabbdbd63f93a3a7a7702ae00abc /src/mailman_pgp/commands | |
| parent | 0b9335e163791959390bf3c83928e5b61e912fa3 (diff) | |
| download | mailman-pgp-9f0ac2239af18f780c757f8cf6524c99de2dffe8.tar.gz mailman-pgp-9f0ac2239af18f780c757f8cf6524c99de2dffe8.tar.zst mailman-pgp-9f0ac2239af18f780c757f8cf6524c99de2dffe8.zip | |
Diffstat (limited to 'src/mailman_pgp/commands')
| -rw-r--r-- | src/mailman_pgp/commands/eml_key.py | 4 | ||||
| -rw-r--r-- | src/mailman_pgp/commands/tests/test_key.py | 295 |
2 files changed, 284 insertions, 15 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index 2f7a7e7..1b6dc9f 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -109,6 +109,10 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): print('A pgp enabled address not found.', file=results) return ContinueProcessing.no + if pgp_address.key is None: + print('No key set.', file=results) + return ContinueProcessing.no + wrapped = PGPWrapper(msg) if wrapped.is_encrypted(): decrypted = wrapped.decrypt(pgp_list.key) diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index 44d5b25..d0ff7e9 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -157,6 +157,49 @@ class TestPreSubscription(unittest.TestCase): confirm_wrapped = PGPWrapper(confirm_request) self.assertTrue(confirm_wrapped.is_encrypted()) + def test_set_encrypted(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart) + + get_queue_messages('virgin') + + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'Re: key set {}'.format(token)) + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.encrypt(self.pgp_list.pubkey, + self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + pgp_address = PGPAddress.for_address(bart) + self.assertIsNotNone(pgp_address) + self.assertEqual(pgp_address.key.fingerprint, + self.bart_key.fingerprint) + self.assertFalse(pgp_address.key_confirmed) + + items = get_queue_messages('virgin', expected_count=2) + if items[0].msg['Subject'] == 'The results of your email commands': + results = items[0].msg + confirm_request = items[1].msg + else: + results = items[1].msg + confirm_request = items[0].msg + + self.assertIn('Key succesfully set.', results.get_payload()) + self.assertIn('Key fingerprint: {}'.format(self.bart_key.fingerprint), + results.get_payload()) + + confirm_wrapped = PGPWrapper(confirm_request) + self.assertTrue(confirm_wrapped.is_encrypted()) + def test_set_no_token(self): message = _create_plain('bart@example.com', 'test@example.com', 'key set', '') @@ -227,6 +270,47 @@ class TestPreSubscription(unittest.TestCase): self.assertIn('No adddress to subscribe with.', results_msg.get_payload()) + def test_set_no_pgp_address(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'key set token') + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('A pgp enabled address not found.', + results_msg.get_payload()) + + def test_set_wrong_token(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + with transaction() as t: + pgp_address = PGPAddress(bart) + t.add(pgp_address) + + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'key set token') + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Wrong token.', results_msg.get_payload()) + def test_confirm(self): self.mlist.subscription_policy = OpenSubscriptionPolicy bart = getUtility(IUserManager).create_address('bart@example.com', @@ -321,6 +405,29 @@ class TestPreSubscription(unittest.TestCase): self.assertIn('A pgp enabled address not found.', results_msg.get_payload()) + def test_confirm_no_key(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + t.add(pgp_address) + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm token', + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + 'token')) + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign(self.bart_key) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No key set.', results_msg.get_payload()) + def test_confirm_not_signed(self): self.mlist.subscription_policy = OpenSubscriptionPolicy bart = getUtility(IUserManager).create_address('bart@example.com', @@ -348,6 +455,92 @@ class TestPreSubscription(unittest.TestCase): self.assertIn('Message not signed, ignoring.', results_msg.get_payload()) + def test_confirm_invalid_sig(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart, pubkey=self.bart_key.pubkey) + + get_queue_messages('virgin') + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm {}'.format(token), + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + token)) + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign(self.bart_key) + message.get_payload(0).set_payload( + 'Something that was definitely not signed.') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Message failed to verify.', + results_msg.get_payload()) + + def test_confirm_wrong_token(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + t.add(pgp_address) + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm token', + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + 'token')) + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign(self.bart_key) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Wrong token.', results_msg.get_payload()) + + def test_confirm_no_signed_statement(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart, pubkey=self.bart_key.pubkey) + + get_queue_messages('virgin') + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm {}'.format(token), + 'Some text, that definitely does not' + 'contain the required/expected statement.') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign(self.bart_key) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn("Message doesn't contain the expected statement.", + results_msg.get_payload()) + @public class TestAfterSubscription(unittest.TestCase): @@ -358,22 +551,23 @@ class TestAfterSubscription(unittest.TestCase): self.pgp_list = PGPMailingList.for_list(self.mlist) self.pgp_list.key = load_key('ecc_p256.priv.asc') - def test_key_change(self): + self.bart_key = load_key('rsa_1024.priv.asc') + self.bart_new_key = load_key('ecc_p256.priv.asc') + + def test_change(self): bart = getUtility(IUserManager).create_address('bart@example.com', 'Bart Person') - bart_key = load_key('rsa_1024.priv.asc') - bart_new_key = load_key('ecc_p256.priv.asc') with transaction() as t: pgp_address = PGPAddress(bart) - pgp_address.key = bart_key.pubkey + pgp_address.key = self.bart_key.pubkey pgp_address.key_confirmed = True t.add(pgp_address) message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(bart_new_key.pubkey) + message = wrapped_message.attach_key(self.bart_new_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -391,25 +585,23 @@ class TestAfterSubscription(unittest.TestCase): confirm_wrapped = PGPWrapper(confirm_request) self.assertTrue(confirm_wrapped.is_encrypted()) - decrypted = confirm_wrapped.decrypt(bart_new_key) + decrypted = confirm_wrapped.decrypt(self.bart_new_key) self.assertIn('key confirm', decrypted['subject']) - def test_key_change_confirm(self): + def test_change_confirm(self): bart = getUtility(IUserManager).create_address('bart@example.com', 'Bart Person') - bart_key = load_key('rsa_1024.priv.asc') - bart_new_key = load_key('ecc_p256.priv.asc') with transaction() as t: pgp_address = PGPAddress(bart) - pgp_address.key = bart_key.pubkey + pgp_address.key = self.bart_key.pubkey pgp_address.key_confirmed = True t.add(pgp_address) message = _create_mixed('bart@example.com', 'test@example.com', 'key change') wrapped_message = MIMEWrapper(message) - message = wrapped_message.attach_key(bart_new_key.pubkey) + message = wrapped_message.attach_key(self.bart_new_key.pubkey) mm_config.switchboards['command'].enqueue(message, listid='test.example.com') @@ -421,7 +613,7 @@ class TestAfterSubscription(unittest.TestCase): else: confirm_request = items[0].msg request_wrapped = PGPWrapper(confirm_request) - decrypted = request_wrapped.decrypt(bart_new_key) + decrypted = request_wrapped.decrypt(self.bart_new_key) subj = decrypted['subject'] token = subj.split(' ')[-1] @@ -429,15 +621,88 @@ class TestAfterSubscription(unittest.TestCase): confirm_message = _create_plain('bart@example.com', 'test@example.com', decrypted['subject'], CHANGE_CONFIRM_REQUEST.format( - bart_new_key.fingerprint, + self.bart_new_key.fingerprint, token)) wrapped_confirm = MIMEWrapper(confirm_message) - confirm = wrapped_confirm.sign(bart_key) + confirm = wrapped_confirm.sign(self.bart_key) mm_config.switchboards['command'].enqueue(confirm, listid='test.example.com') make_testable_runner(CommandRunner, 'command').run() pgp_address = PGPAddress.for_address(bart) - self.assertEqual(pgp_address.key_fingerprint, bart_new_key.fingerprint) + self.assertEqual(pgp_address.key_fingerprint, + self.bart_new_key.fingerprint) self.assertTrue(pgp_address.key_confirmed) + + def test_change_extra_arg(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key change extra arguments', '') + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Extraneous argument/s: extra,arguments', + results_msg.get_payload()) + + def test_change_no_key(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key change', '') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No keys attached? Send a key.', + results_msg.get_payload()) + + def test_change_multiple_keys(self): + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'key change') + + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_new_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('More than one key! Send only one key.', + results_msg.get_payload()) + + def test_change_no_email(self): + message = _create_mixed('', 'test@example.com', 'key change') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_key(self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No email to change key of.', results_msg.get_payload()) + + def test_change_no_pgp_address(self): + message = _create_mixed('bart@example.com', 'test@example.com', + 'key change') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_key(self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('A pgp enabled address not found.', + results_msg.get_payload()) |
