aboutsummaryrefslogtreecommitdiff
path: root/src/mailman_pgp/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman_pgp/commands')
-rw-r--r--src/mailman_pgp/commands/eml_key.py46
-rw-r--r--src/mailman_pgp/commands/tests/test_key.py49
2 files changed, 62 insertions, 33 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index 9df6065..7b7782d 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -19,6 +19,7 @@
from email.utils import parseaddr
from mailman.interfaces.command import ContinueProcessing, IEmailCommand
+from mailman.interfaces.pending import IPendings
from mailman.interfaces.subscriptions import ISubscriptionManager
from mailman.interfaces.usermanager import IUserManager
from public import public
@@ -47,6 +48,10 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results):
return ContinueProcessing.no
wrapped = PGPWrapper(msg)
+ if wrapped.is_encrypted():
+ decrypted = wrapped.decrypt(pgp_list.key)
+ wrapped = PGPWrapper(decrypted)
+
if not wrapped.has_keys():
print('No keys attached? Send a key.', file=results)
return ContinueProcessing.no
@@ -66,21 +71,24 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results):
print('No adddress to subscribe with.', file=results)
return ContinueProcessing.no
- with transaction() as t:
- pgp_address = PGPAddress.for_address(address)
- if pgp_address is None:
- pgp_address = PGPAddress(address)
- pgp_address.key = keys.pop()
- t.add(pgp_address)
+ pgp_address = PGPAddress.for_address(address)
+ if pgp_address is None:
+ print('A pgp enabled address not found.', file=results)
+ return ContinueProcessing.no
token = arguments[1]
- try:
- ISubscriptionManager(mlist).confirm(token)
- print('Key succesfully set.', file=results)
- print('Key fingerprint: {}'.format(pgp_address.key.fingerprint),
- file=results)
- except LookupError:
+ pendable = getUtility(IPendings).confirm(token, expunge=False)
+ if pendable is None:
print('Wrong token.', file=results)
+ return ContinueProcessing.no
+
+ with transaction():
+ pgp_address.key = keys.pop()
+ ISubscriptionManager(mlist).confirm(token)
+
+ print('Key succesfully set.', file=results)
+ print('Key fingerprint: {}'.format(pgp_address.key.fingerprint),
+ file=results)
return ContinueProcessing.no
@@ -115,13 +123,17 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
token = arguments[1]
- expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint,
- token)
+ pendable = getUtility(IPendings).confirm(token, expunge=False)
+ if pendable is None:
+ print('Wrong token.', file=results)
+ return ContinueProcessing.no
+
+ # TODO differentiate between key change and subscription here.
+
+ expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint, token)
for sig_subject in wrapped.get_signed():
if expecting in sig_subject:
- with transaction():
- pgp_address.key_confirmed = True
- ISubscriptionManager(mlist).confirm(token)
+ ISubscriptionManager(mlist).confirm(token)
break
else:
print("Message doesn't contain the expected statement.", file=results)
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
index fe75a6a..64f8ae6 100644
--- a/src/mailman_pgp/commands/tests/test_key.py
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -134,7 +134,6 @@ class TestPreSubscription(unittest.TestCase):
pgp_address = PGPAddress.for_address(bart)
self.assertIsNotNone(pgp_address)
self.assertEqual(pgp_address.key.fingerprint, bart_key.fingerprint)
- self.assertEqual(pgp_address.key_fingerprint, bart_key.fingerprint)
self.assertFalse(pgp_address.key_confirmed)
items = get_queue_messages('virgin', expected_count=2)
@@ -242,11 +241,6 @@ class TestPreSubscription(unittest.TestCase):
get_queue_messages('virgin')
- with transaction() as t:
- pgp_address = PGPAddress(bart)
- pgp_address.key = bart_key.pubkey
- t.add(pgp_address)
-
message = _create_plain('bart@example.com', 'test@example.com',
'Re: key confirm {}'.format(token),
CONFIRM_REQUEST.format(bart_key.fingerprint,
@@ -275,11 +269,6 @@ class TestPreSubscription(unittest.TestCase):
get_queue_messages('virgin')
- with transaction() as t:
- pgp_address = PGPAddress(bart)
- pgp_address.key = bart_key.pubkey
- t.add(pgp_address)
-
message = _create_plain('bart@example.com', 'test@example.com',
'Re: key confirm {}'.format(token),
CONFIRM_REQUEST.format(bart_key.fingerprint,
@@ -290,6 +279,7 @@ class TestPreSubscription(unittest.TestCase):
mm_config.switchboards['command'].enqueue(message,
listid='test.example.com')
+
make_testable_runner(CommandRunner, 'command').run()
pgp_address = PGPAddress.for_address(bart)
@@ -346,11 +336,6 @@ class TestPreSubscription(unittest.TestCase):
get_queue_messages('virgin')
- with transaction() as t:
- pgp_address = PGPAddress(bart)
- pgp_address.key = bart_key.pubkey
- t.add(pgp_address)
-
message = _create_plain('bart@example.com', 'test@example.com',
'Re: key confirm {}'.format(token),
CONFIRM_REQUEST.format(bart_key.fingerprint,
@@ -408,3 +393,35 @@ class TestAfterSubscription(unittest.TestCase):
confirm_wrapped = PGPWrapper(confirm_request)
self.assertTrue(confirm_wrapped.is_encrypted())
+ decrypted = confirm_wrapped.decrypt(bart_new_key)
+ self.assertIn('key confirm', decrypted['subject'])
+
+ def test_key_change_confirm(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart_key = load_key('rsa_1024.priv.asc')
+ bart_new_key = load_key('ecc_p256.priv.asc')
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key change')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_key(bart_new_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=2)
+ if items[0].msg['Subject'] == 'The results of your email commands':
+ confirm_request = items[1].msg
+ else:
+ confirm_request = items[0].msg
+ request_wrapped = PGPWrapper(confirm_request)
+ request_wrapped.decrypt(bart_new_key)
+ # TODO finish this