diff options
Diffstat (limited to 'src/mailman_pgp/commands/tests/test_key.py')
| -rw-r--r-- | src/mailman_pgp/commands/tests/test_key.py | 278 |
1 files changed, 86 insertions, 192 deletions
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index 1a128a1..266def6 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -63,6 +63,12 @@ def _create_mixed(from_hdr, to_hdr, subject_hdr): return message +def _run_message(message, expected_responses=None, list_id='test.example.com'): + mm_config.switchboards['command'].enqueue(message, listid=list_id) + make_testable_runner(CommandRunner, 'command').run() + return get_queue_messages('virgin', expected_count=expected_responses) + + class TestPreDispatch(unittest.TestCase): layer = PGPConfigLayer @@ -73,10 +79,7 @@ class TestPreDispatch(unittest.TestCase): message = _create_plain('bart@example.com', 'test@example.com', 'key', '') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('No sub-command specified', results_msg.get_payload()) @@ -85,10 +88,7 @@ class TestPreDispatch(unittest.TestCase): message = _create_plain('bart@example.com', 'test@example.com', 'key wrooooooong', '') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('Wrong sub-command specified', results_msg.get_payload()) @@ -97,10 +97,7 @@ class TestPreDispatch(unittest.TestCase): message = _create_plain('bart@example.com', 'test@example.com', 'key set', '') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn("This mailing list doesn't have pgp enabled.", @@ -146,9 +143,7 @@ class TestPreSubscription(unittest.TestCase): wrapped_set_message = MIMEWrapper(set_message) set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) - mm_config.switchboards['command'].enqueue(set_message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() + items = _run_message(set_message, 2) pgp_address = PGPAddress.for_address(bart) self.assertIsNotNone(pgp_address) @@ -156,7 +151,6 @@ class TestPreSubscription(unittest.TestCase): self.bart_key.fingerprint) self.assertFalse(pgp_address.key_confirmed) - items = get_queue_messages('virgin', expected_count=2) if (items[0].msg['Subject'] == 'The results of your email commands'): # pragma: no cover results = items[0].msg @@ -190,9 +184,7 @@ class TestPreSubscription(unittest.TestCase): set_message = wrapped_set_message.encrypt(self.pgp_list.pubkey, self.bart_key.pubkey) - mm_config.switchboards['command'].enqueue(set_message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() + items = _run_message(set_message, 2) pgp_address = PGPAddress.for_address(bart) self.assertIsNotNone(pgp_address) @@ -200,7 +192,6 @@ class TestPreSubscription(unittest.TestCase): self.bart_key.fingerprint) self.assertFalse(pgp_address.key_confirmed) - items = get_queue_messages('virgin', expected_count=2) if (items[0].msg['Subject'] == 'The results of your email commands'): # pragma: no cover results = items[0].msg @@ -220,10 +211,7 @@ class TestPreSubscription(unittest.TestCase): message = _create_plain('bart@example.com', 'test@example.com', 'key set', '') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('Missing token.', results_msg.get_payload()) @@ -232,10 +220,7 @@ class TestPreSubscription(unittest.TestCase): message = _create_plain('bart@example.com', 'test@example.com', 'key set token', '') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('No keys attached? Send a key.', @@ -249,10 +234,7 @@ class TestPreSubscription(unittest.TestCase): wrapped_set_message = MIMEWrapper(set_message) set_message = wrapped_set_message.attach_keys(self.anne_key.pubkey) - mm_config.switchboards['command'].enqueue(set_message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(set_message, 1) results_msg = items[0].msg self.assertIn('More than one key! Send only one key.', @@ -264,10 +246,7 @@ class TestPreSubscription(unittest.TestCase): wrapped_set_message = MIMEWrapper(set_message) set_message = wrapped_set_message.attach_keys(self.bart_key) - mm_config.switchboards['command'].enqueue(set_message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(set_message, 1) results_msg = items[0].msg self.assertIn('You probably wanted to send your public key only.', @@ -279,10 +258,7 @@ class TestPreSubscription(unittest.TestCase): wrapped_set_message = MIMEWrapper(set_message) set_message = wrapped_set_message.attach_keys(self.unusable_key.pubkey) - mm_config.switchboards['command'].enqueue(set_message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(set_message, 1) results_msg = items[0].msg self.assertIn( @@ -294,10 +270,7 @@ class TestPreSubscription(unittest.TestCase): wrapped_message = MIMEWrapper(message) message = wrapped_message.attach_keys(self.bart_key.pubkey) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('No email to subscribe with.', results_msg.get_payload()) @@ -308,10 +281,7 @@ class TestPreSubscription(unittest.TestCase): wrapped_set_message = MIMEWrapper(set_message) set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) - mm_config.switchboards['command'].enqueue(set_message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(set_message, 1) results_msg = items[0].msg self.assertIn('No adddress to subscribe with.', @@ -327,10 +297,7 @@ class TestPreSubscription(unittest.TestCase): wrapped_set_message = MIMEWrapper(set_message) set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) - mm_config.switchboards['command'].enqueue(set_message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(set_message, 1) results_msg = items[0].msg self.assertIn('A pgp enabled address not found.', @@ -350,10 +317,7 @@ class TestPreSubscription(unittest.TestCase): wrapped_set_message = MIMEWrapper(set_message) set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey) - mm_config.switchboards['command'].enqueue(set_message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(set_message, 1) results_msg = items[0].msg self.assertIn('Wrong token.', results_msg.get_payload()) @@ -377,9 +341,7 @@ class TestPreSubscription(unittest.TestCase): wrapped_message = MIMEWrapper(message) message = wrapped_message.sign(self.bart_key) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() + _run_message(message) pgp_address = PGPAddress.for_address(bart) self.assertTrue(pgp_address.key_confirmed) @@ -406,10 +368,7 @@ class TestPreSubscription(unittest.TestCase): self.pgp_list.pubkey, self.bart_key.pubkey) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - - make_testable_runner(CommandRunner, 'command').run() + _run_message(message) pgp_address = PGPAddress.for_address(bart) self.assertTrue(pgp_address.key_confirmed) @@ -419,10 +378,7 @@ class TestPreSubscription(unittest.TestCase): message = _create_plain('bart@example.com', 'test@example.com', 'key confirm', '') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('Missing token.', results_msg.get_payload()) @@ -431,10 +387,7 @@ class TestPreSubscription(unittest.TestCase): message = _create_plain('', 'test@example.com', 'key confirm token', '') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('No email to subscribe with.', results_msg.get_payload()) @@ -443,10 +396,7 @@ class TestPreSubscription(unittest.TestCase): message = _create_plain('bart@example.com', 'test@example.com', 'key confirm token', '') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('A pgp enabled address not found.', @@ -467,10 +417,7 @@ class TestPreSubscription(unittest.TestCase): wrapped_message = MIMEWrapper(message) message = wrapped_message.sign(self.bart_key) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('No key set.', results_msg.get_payload()) @@ -492,11 +439,7 @@ class TestPreSubscription(unittest.TestCase): self.bart_key.fingerprint, token)) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('Message not signed, ignoring.', @@ -523,11 +466,7 @@ class TestPreSubscription(unittest.TestCase): message.get_payload(0).set_payload( 'Something that was definitely not signed.') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('Message failed to verify.', @@ -551,11 +490,7 @@ class TestPreSubscription(unittest.TestCase): wrapped_message = MIMEWrapper(message) message = wrapped_message.sign(self.bart_key) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('Wrong token.', results_msg.get_payload()) @@ -578,11 +513,7 @@ class TestPreSubscription(unittest.TestCase): wrapped_message = MIMEWrapper(message) message = wrapped_message.sign(self.bart_key) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn("Message doesn't contain the expected statement.", @@ -632,11 +563,7 @@ class TestAfterSubscription(unittest.TestCase): wrapped_message = MIMEWrapper(message) message = wrapped_message.attach_keys(self.bart_new_key.pubkey) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - - items = get_queue_messages('virgin', expected_count=2) + items = _run_message(message, 2) if (items[0].msg['Subject'] == 'The results of your email commands'): # pragma: no cover results = items[0].msg @@ -669,11 +596,7 @@ class TestAfterSubscription(unittest.TestCase): wrapped_message = MIMEWrapper(message) message = wrapped_message.encrypt(self.pgp_list.pubkey) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - - items = get_queue_messages('virgin', expected_count=2) + items = _run_message(message, 2) if (items[0].msg['Subject'] == 'The results of your email commands'): # pragma: no cover results = items[0].msg @@ -704,11 +627,7 @@ class TestAfterSubscription(unittest.TestCase): wrapped_message = MIMEWrapper(message) message = wrapped_message.attach_keys(self.bart_new_key.pubkey) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - - items = get_queue_messages('virgin', expected_count=2) + items = _run_message(message, 2) if (items[0].msg['Subject'] == 'The results of your email commands'): # pragma: no cover confirm_request = items[1].msg @@ -728,9 +647,7 @@ class TestAfterSubscription(unittest.TestCase): wrapped_confirm = MIMEWrapper(confirm_message) confirm = wrapped_confirm.sign(self.bart_key) - mm_config.switchboards['command'].enqueue(confirm, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() + _run_message(confirm) pgp_address = PGPAddress.for_address(bart) self.assertEqual(pgp_address.key_fingerprint, @@ -740,11 +657,7 @@ class TestAfterSubscription(unittest.TestCase): def test_change_extra_arg(self): message = _create_plain('bart@example.com', 'test@example.com', 'key change extra arguments', '') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('Extraneous argument/s: extra,arguments', @@ -755,10 +668,7 @@ class TestAfterSubscription(unittest.TestCase): wrapped_message = MIMEWrapper(message) message = wrapped_message.attach_keys(self.bart_key.pubkey) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('No email to change key of.', results_msg.get_payload()) @@ -769,15 +679,45 @@ class TestAfterSubscription(unittest.TestCase): wrapped_message = MIMEWrapper(message) message = wrapped_message.attach_keys(self.bart_key.pubkey) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('A pgp enabled address not found.', results_msg.get_payload()) + def test_change_no_key_set(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + t.add(pgp_address) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key change') + + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn("You currently don't have a key set.", + results_msg.get_payload()) + + def test_change_key_not_confirmed(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + t.add(pgp_address) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key change') + + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn('Your key is currently not confirmed.', + results_msg.get_payload()) + def test_change_no_key(self): bart = getUtility(IUserManager).create_address('bart@example.com', 'Bart Person') @@ -790,10 +730,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_plain('bart@example.com', 'test@example.com', 'key change', '') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('No keys attached? Send a key.', @@ -816,10 +753,7 @@ class TestAfterSubscription(unittest.TestCase): wrapped_set_message = MIMEWrapper(set_message) set_message = wrapped_set_message.attach_keys(self.bart_new_key.pubkey) - mm_config.switchboards['command'].enqueue(set_message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(set_message, 1) results_msg = items[0].msg self.assertIn('More than one key! Send only one key.', @@ -839,10 +773,7 @@ class TestAfterSubscription(unittest.TestCase): wrapped_message = MIMEWrapper(message) message = wrapped_message.attach_keys(self.bart_key) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('You probably wanted to send your public key only.', @@ -862,10 +793,7 @@ class TestAfterSubscription(unittest.TestCase): wrapped_message = MIMEWrapper(message) message = wrapped_message.attach_keys(self.unusable_key.pubkey) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn( @@ -875,11 +803,7 @@ class TestAfterSubscription(unittest.TestCase): def test_revoke_extra_arg(self): message = _create_plain('bart@example.com', 'test@example.com', 'key revoke extra arguments', '') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('Extraneous argument/s: extra,arguments', @@ -888,10 +812,7 @@ class TestAfterSubscription(unittest.TestCase): def test_revoke_no_email(self): message = _create_mixed('', 'test@example.com', 'key revoke') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('No email to revoke key of.', results_msg.get_payload()) @@ -900,10 +821,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_mixed('bart@example.com', 'test@example.com', 'key revoke') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('A pgp enabled address not found.', @@ -919,10 +837,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_mixed('bart@example.com', 'test@example.com', 'key revoke') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn("You currently don't have a key set.", @@ -939,10 +854,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_mixed('bart@example.com', 'test@example.com', 'key revoke') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('Your key is currently not confirmed.', @@ -959,10 +871,7 @@ class TestAfterSubscription(unittest.TestCase): message = _create_plain('bart@example.com', 'test@example.com', 'key revoke', '') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('No key revocations attached? Send a key revocation.', @@ -984,16 +893,13 @@ class TestAfterSubscription(unittest.TestCase): wrapped_message = MIMEWrapper(message) message = wrapped_message.attach_revocs(revoc) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=2) + items = _run_message(message, 2) if (items[0].msg['Subject'] == 'The results of your email commands'): # pragma: no cover results_msg = items[0].msg else: results_msg = items[1].msg - #TODO: finish test + # TODO: finish test self.assertIn('Key needs to be reset.', results_msg.get_payload()) @@ -1027,10 +933,7 @@ class TestAfterSubscription(unittest.TestCase): wrapped_message = MIMEWrapper(message) message = wrapped_message.attach_revocs(revoc) - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('Key succesfully updated.', results_msg.get_payload()) @@ -1051,10 +954,7 @@ class TestGeneral(unittest.TestCase): def test_receive(self): message = _create_plain('bart@example.com', 'test@example.com', 'key receive', '') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=2) + items = _run_message(message, 2) if (items[0].msg['Subject'] == 'The results of your email commands'): # pragma: no cover pubkey_message = items[1].msg @@ -1070,10 +970,7 @@ class TestGeneral(unittest.TestCase): def test_receive_extra_arg(self): message = _create_plain('bart@example.com', 'test@example.com', 'key receive extra arguments', '') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('Extraneous argument/s: extra,arguments', @@ -1081,10 +978,7 @@ class TestGeneral(unittest.TestCase): def test_receive_no_email(self): message = _create_plain('', 'test@example.com', 'key receive', '') - mm_config.switchboards['command'].enqueue(message, - listid='test.example.com') - make_testable_runner(CommandRunner, 'command').run() - items = get_queue_messages('virgin', expected_count=1) + items = _run_message(message, 1) results_msg = items[0].msg self.assertIn('No email to send list public key.', |
