aboutsummaryrefslogtreecommitdiff
path: root/src/mailman_pgp
diff options
context:
space:
mode:
authorJ08nY2017-07-14 15:13:47 +0200
committerJ08nY2017-07-14 15:13:47 +0200
commit9f0ac2239af18f780c757f8cf6524c99de2dffe8 (patch)
treef1b7d906ce1ebabbdbd63f93a3a7a7702ae00abc /src/mailman_pgp
parent0b9335e163791959390bf3c83928e5b61e912fa3 (diff)
downloadmailman-pgp-9f0ac2239af18f780c757f8cf6524c99de2dffe8.tar.gz
mailman-pgp-9f0ac2239af18f780c757f8cf6524c99de2dffe8.tar.zst
mailman-pgp-9f0ac2239af18f780c757f8cf6524c99de2dffe8.zip
Diffstat (limited to 'src/mailman_pgp')
-rw-r--r--src/mailman_pgp/commands/eml_key.py4
-rw-r--r--src/mailman_pgp/commands/tests/test_key.py295
-rw-r--r--src/mailman_pgp/workflows/base.py3
-rw-r--r--src/mailman_pgp/workflows/key_change.py4
4 files changed, 288 insertions, 18 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index 2f7a7e7..1b6dc9f 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -109,6 +109,10 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
print('A pgp enabled address not found.', file=results)
return ContinueProcessing.no
+ if pgp_address.key is None:
+ print('No key set.', file=results)
+ return ContinueProcessing.no
+
wrapped = PGPWrapper(msg)
if wrapped.is_encrypted():
decrypted = wrapped.decrypt(pgp_list.key)
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
index 44d5b25..d0ff7e9 100644
--- a/src/mailman_pgp/commands/tests/test_key.py
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -157,6 +157,49 @@ class TestPreSubscription(unittest.TestCase):
confirm_wrapped = PGPWrapper(confirm_request)
self.assertTrue(confirm_wrapped.is_encrypted())
+ def test_set_encrypted(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart)
+
+ get_queue_messages('virgin')
+
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'Re: key set {}'.format(token))
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.encrypt(self.pgp_list.pubkey,
+ self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ pgp_address = PGPAddress.for_address(bart)
+ self.assertIsNotNone(pgp_address)
+ self.assertEqual(pgp_address.key.fingerprint,
+ self.bart_key.fingerprint)
+ self.assertFalse(pgp_address.key_confirmed)
+
+ items = get_queue_messages('virgin', expected_count=2)
+ if items[0].msg['Subject'] == 'The results of your email commands':
+ results = items[0].msg
+ confirm_request = items[1].msg
+ else:
+ results = items[1].msg
+ confirm_request = items[0].msg
+
+ self.assertIn('Key succesfully set.', results.get_payload())
+ self.assertIn('Key fingerprint: {}'.format(self.bart_key.fingerprint),
+ results.get_payload())
+
+ confirm_wrapped = PGPWrapper(confirm_request)
+ self.assertTrue(confirm_wrapped.is_encrypted())
+
def test_set_no_token(self):
message = _create_plain('bart@example.com', 'test@example.com',
'key set', '')
@@ -227,6 +270,47 @@ class TestPreSubscription(unittest.TestCase):
self.assertIn('No adddress to subscribe with.',
results_msg.get_payload())
+ def test_set_no_pgp_address(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key set token')
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('A pgp enabled address not found.',
+ results_msg.get_payload())
+
+ def test_set_wrong_token(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ t.add(pgp_address)
+
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key set token')
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Wrong token.', results_msg.get_payload())
+
def test_confirm(self):
self.mlist.subscription_policy = OpenSubscriptionPolicy
bart = getUtility(IUserManager).create_address('bart@example.com',
@@ -321,6 +405,29 @@ class TestPreSubscription(unittest.TestCase):
self.assertIn('A pgp enabled address not found.',
results_msg.get_payload())
+ def test_confirm_no_key(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ t.add(pgp_address)
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm token',
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ 'token'))
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign(self.bart_key)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No key set.', results_msg.get_payload())
+
def test_confirm_not_signed(self):
self.mlist.subscription_policy = OpenSubscriptionPolicy
bart = getUtility(IUserManager).create_address('bart@example.com',
@@ -348,6 +455,92 @@ class TestPreSubscription(unittest.TestCase):
self.assertIn('Message not signed, ignoring.',
results_msg.get_payload())
+ def test_confirm_invalid_sig(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart, pubkey=self.bart_key.pubkey)
+
+ get_queue_messages('virgin')
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm {}'.format(token),
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ token))
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign(self.bart_key)
+ message.get_payload(0).set_payload(
+ 'Something that was definitely not signed.')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Message failed to verify.',
+ results_msg.get_payload())
+
+ def test_confirm_wrong_token(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ t.add(pgp_address)
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm token',
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ 'token'))
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign(self.bart_key)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Wrong token.', results_msg.get_payload())
+
+ def test_confirm_no_signed_statement(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart, pubkey=self.bart_key.pubkey)
+
+ get_queue_messages('virgin')
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm {}'.format(token),
+ 'Some text, that definitely does not'
+ 'contain the required/expected statement.')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign(self.bart_key)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn("Message doesn't contain the expected statement.",
+ results_msg.get_payload())
+
@public
class TestAfterSubscription(unittest.TestCase):
@@ -358,22 +551,23 @@ class TestAfterSubscription(unittest.TestCase):
self.pgp_list = PGPMailingList.for_list(self.mlist)
self.pgp_list.key = load_key('ecc_p256.priv.asc')
- def test_key_change(self):
+ self.bart_key = load_key('rsa_1024.priv.asc')
+ self.bart_new_key = load_key('ecc_p256.priv.asc')
+
+ def test_change(self):
bart = getUtility(IUserManager).create_address('bart@example.com',
'Bart Person')
- bart_key = load_key('rsa_1024.priv.asc')
- bart_new_key = load_key('ecc_p256.priv.asc')
with transaction() as t:
pgp_address = PGPAddress(bart)
- pgp_address.key = bart_key.pubkey
+ pgp_address.key = self.bart_key.pubkey
pgp_address.key_confirmed = True
t.add(pgp_address)
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_key(bart_new_key.pubkey)
+ message = wrapped_message.attach_key(self.bart_new_key.pubkey)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -391,25 +585,23 @@ class TestAfterSubscription(unittest.TestCase):
confirm_wrapped = PGPWrapper(confirm_request)
self.assertTrue(confirm_wrapped.is_encrypted())
- decrypted = confirm_wrapped.decrypt(bart_new_key)
+ decrypted = confirm_wrapped.decrypt(self.bart_new_key)
self.assertIn('key confirm', decrypted['subject'])
- def test_key_change_confirm(self):
+ def test_change_confirm(self):
bart = getUtility(IUserManager).create_address('bart@example.com',
'Bart Person')
- bart_key = load_key('rsa_1024.priv.asc')
- bart_new_key = load_key('ecc_p256.priv.asc')
with transaction() as t:
pgp_address = PGPAddress(bart)
- pgp_address.key = bart_key.pubkey
+ pgp_address.key = self.bart_key.pubkey
pgp_address.key_confirmed = True
t.add(pgp_address)
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_key(bart_new_key.pubkey)
+ message = wrapped_message.attach_key(self.bart_new_key.pubkey)
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
@@ -421,7 +613,7 @@ class TestAfterSubscription(unittest.TestCase):
else:
confirm_request = items[0].msg
request_wrapped = PGPWrapper(confirm_request)
- decrypted = request_wrapped.decrypt(bart_new_key)
+ decrypted = request_wrapped.decrypt(self.bart_new_key)
subj = decrypted['subject']
token = subj.split(' ')[-1]
@@ -429,15 +621,88 @@ class TestAfterSubscription(unittest.TestCase):
confirm_message = _create_plain('bart@example.com', 'test@example.com',
decrypted['subject'],
CHANGE_CONFIRM_REQUEST.format(
- bart_new_key.fingerprint,
+ self.bart_new_key.fingerprint,
token))
wrapped_confirm = MIMEWrapper(confirm_message)
- confirm = wrapped_confirm.sign(bart_key)
+ confirm = wrapped_confirm.sign(self.bart_key)
mm_config.switchboards['command'].enqueue(confirm,
listid='test.example.com')
make_testable_runner(CommandRunner, 'command').run()
pgp_address = PGPAddress.for_address(bart)
- self.assertEqual(pgp_address.key_fingerprint, bart_new_key.fingerprint)
+ self.assertEqual(pgp_address.key_fingerprint,
+ self.bart_new_key.fingerprint)
self.assertTrue(pgp_address.key_confirmed)
+
+ def test_change_extra_arg(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key change extra arguments', '')
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Extraneous argument/s: extra,arguments',
+ results_msg.get_payload())
+
+ def test_change_no_key(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key change', '')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No keys attached? Send a key.',
+ results_msg.get_payload())
+
+ def test_change_multiple_keys(self):
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key change')
+
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_new_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('More than one key! Send only one key.',
+ results_msg.get_payload())
+
+ def test_change_no_email(self):
+ message = _create_mixed('', 'test@example.com', 'key change')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_key(self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No email to change key of.', results_msg.get_payload())
+
+ def test_change_no_pgp_address(self):
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key change')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_key(self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('A pgp enabled address not found.',
+ results_msg.get_payload())
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
index 014f2dd..d35e58c 100644
--- a/src/mailman_pgp/workflows/base.py
+++ b/src/mailman_pgp/workflows/base.py
@@ -130,8 +130,7 @@ class ConfirmPubkeyMixin:
self.token))
pgp_list = PGPMailingList.for_list(self.mlist)
wrapped = PGPWrapper(msg)
- encrypted = wrapped.sign_encrypt(pgp_list.key, pgp_address.key,
- pgp_list.pubkey)
+ encrypted = wrapped.sign_encrypt(pgp_list.key, pgp_address.key)
msg.set_payload(encrypted.get_payload())
copy_headers(encrypted, msg, True)
diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py
index c6d3ebc..cc5b9fc 100644
--- a/src/mailman_pgp/workflows/key_change.py
+++ b/src/mailman_pgp/workflows/key_change.py
@@ -28,6 +28,7 @@ from zope.interface import implementer
from mailman_pgp.database import transaction
from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.utils import copy_headers
from mailman_pgp.pgp.wrapper import PGPWrapper
@@ -58,6 +59,7 @@ class KeyChangeWorkflow(Workflow):
def __init__(self, mlist, pgp_address=None, pubkey=None):
super().__init__()
self.mlist = mlist
+ self.pgp_list = PGPMailingList.for_list(mlist)
self.pgp_address = pgp_address
self.pubkey = pubkey
@@ -104,7 +106,7 @@ class KeyChangeWorkflow(Workflow):
self.pubkey.fingerprint,
self.token))
wrapped = PGPWrapper(msg)
- encrypted = wrapped.encrypt(self.pubkey)
+ encrypted = wrapped.sign_encrypt(self.pgp_list.key, self.pubkey)
msg.set_payload(encrypted.get_payload())
copy_headers(encrypted, msg, True)