summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mailman_pgp/commands/eml_key.py31
-rw-r--r--src/mailman_pgp/commands/tests/test_key.py87
2 files changed, 116 insertions, 2 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index 1b6dc9f..deb396a 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -16,8 +16,10 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
"""The key email command."""
+from email.mime.text import MIMEText
from email.utils import parseaddr
+from mailman.email.message import UserNotification
from mailman.interfaces.command import ContinueProcessing, IEmailCommand
from mailman.interfaces.pending import IPendings
from mailman.interfaces.subscriptions import ISubscriptionManager
@@ -29,6 +31,7 @@ from zope.interface import implementer
from mailman_pgp.database import transaction
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.mime import MIMEWrapper
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.workflows.base import CONFIRM_REQUEST
from mailman_pgp.workflows.key_change import (CHANGE_CONFIRM_REQUEST,
@@ -193,15 +196,39 @@ def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results):
pass
+def _cmd_receive(pgp_list, mlist, msg, msgdata, arguments, results):
+ if len(arguments) != 1:
+ print('Extraneous argument/s: ' + ','.join(arguments[1:]),
+ file=results)
+ return ContinueProcessing.no
+
+ email = _get_email(msg)
+ if not email:
+ print('No email to send list public key.', file=results)
+ return ContinueProcessing.no
+
+ msg = UserNotification(email, mlist.request_address,
+ '{} public key'.format(mlist.fqdn_listname))
+ msg.set_type('multipart/mixed')
+ msg['MIME-Version'] = '1.0'
+ msg.attach(MIMEText('Here is the public key you requested.'))
+ wrapped = MIMEWrapper(msg)
+ msg = wrapped.attach_key(pgp_list.pubkey)
+
+ msg.send(mlist)
+ return ContinueProcessing.yes
+
+
SUBCOMMANDS = {
'set': _cmd_set,
'confirm': _cmd_confirm,
'change': _cmd_change,
'revoke': _cmd_revoke,
- 'sign': _cmd_sign
+ 'sign': _cmd_sign,
+ 'receive': _cmd_receive
}
-ARGUMENTS = '<' + '|'.join(SUBCOMMANDS.keys()) + '>'
+ARGUMENTS = '<' + '|'.join(SUBCOMMANDS) + '>'
@public
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
index d0ff7e9..612f13f 100644
--- a/src/mailman_pgp/commands/tests/test_key.py
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -588,6 +588,42 @@ class TestAfterSubscription(unittest.TestCase):
decrypted = confirm_wrapped.decrypt(self.bart_new_key)
self.assertIn('key confirm', decrypted['subject'])
+ def test_change_encrypted(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key change')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_key(self.bart_new_key.pubkey)
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.encrypt(self.pgp_list.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=2)
+ if items[0].msg['Subject'] == 'The results of your email commands':
+ results = items[0].msg
+ confirm_request = items[1].msg
+ else:
+ results = items[1].msg
+ confirm_request = items[0].msg
+
+ self.assertIn('Key change request received.', results.get_payload())
+
+ confirm_wrapped = PGPWrapper(confirm_request)
+ self.assertTrue(confirm_wrapped.is_encrypted())
+ decrypted = confirm_wrapped.decrypt(self.bart_new_key)
+ self.assertIn('key confirm', decrypted['subject'])
+
def test_change_confirm(self):
bart = getUtility(IUserManager).create_address('bart@example.com',
'Bart Person')
@@ -706,3 +742,54 @@ class TestAfterSubscription(unittest.TestCase):
self.assertIn('A pgp enabled address not found.',
results_msg.get_payload())
+
+
+@public
+class TestGeneral(unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ self.mlist = create_list('test@example.com', style_name='pgp-default')
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = load_key('ecc_p256.priv.asc')
+
+ def test_receive(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key receive', '')
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=2)
+ if items[0].msg['Subject'] == 'The results of your email commands':
+ pubkey_message = items[1].msg
+ else:
+ pubkey_message = items[0].msg
+
+ wrapped = PGPWrapper(pubkey_message)
+ self.assertTrue(wrapped.has_keys())
+ keys = list(wrapped.keys())
+ self.assertEqual(len(keys), 1)
+ self.assertEqual(keys[0].fingerprint, self.pgp_list.key.fingerprint)
+
+ def test_receive_extra_arg(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key receive extra arguments', '')
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Extraneous argument/s: extra,arguments',
+ results_msg.get_payload())
+
+ def test_receive_no_email(self):
+ message = _create_plain('', 'test@example.com', 'key receive', '')
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No email to send list public key.',
+ results_msg.get_payload())