diff options
| author | J08nY | 2017-07-14 22:04:08 +0200 |
|---|---|---|
| committer | J08nY | 2017-07-14 22:04:08 +0200 |
| commit | 0129c437289ac0501c26184a983eab9ef14419c9 (patch) | |
| tree | f6550e45575cf9c736d2f2fcdc27e774080cbbd9 | |
| parent | 2b6d4eca265617d1acfbbc790350055af86268dd (diff) | |
| parent | 59ddda945ecf7de859f7976c0cea32dc6434b98e (diff) | |
| download | mailman-pgp-0129c437289ac0501c26184a983eab9ef14419c9.tar.gz mailman-pgp-0129c437289ac0501c26184a983eab9ef14419c9.tar.zst mailman-pgp-0129c437289ac0501c26184a983eab9ef14419c9.zip | |
29 files changed, 2134 insertions, 100 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index 9514f48..1b6dc9f 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -16,11 +16,193 @@ # this program. If not, see <http://www.gnu.org/licenses/>. """The key email command.""" +from email.utils import parseaddr from mailman.interfaces.command import ContinueProcessing, IEmailCommand +from mailman.interfaces.pending import IPendings +from mailman.interfaces.subscriptions import ISubscriptionManager +from mailman.interfaces.usermanager import IUserManager from public import public +from zope.component import getUtility from zope.interface import implementer +from mailman_pgp.database import transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.workflows.base import CONFIRM_REQUEST +from mailman_pgp.workflows.key_change import (CHANGE_CONFIRM_REQUEST, + KeyChangeWorkflow) + + +def _get_email(msg): + display_name, email = parseaddr(msg['from']) + # Address could be None or the empty string. + if not email: + email = msg.sender + return email + + +def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results): + if len(arguments) != 2: + print('Missing token.', file=results) + return ContinueProcessing.no + + wrapped = PGPWrapper(msg) + if wrapped.is_encrypted(): + decrypted = wrapped.decrypt(pgp_list.key) + wrapped = PGPWrapper(decrypted) + + if not wrapped.has_keys(): + print('No keys attached? Send a key.', file=results) + return ContinueProcessing.no + + keys = list(wrapped.keys()) + if len(keys) != 1: + print('More than one key! Send only one key.', file=results) + return ContinueProcessing.no + + email = _get_email(msg) + if not email: + print('No email to subscribe with.', file=results) + return ContinueProcessing.no + + address = getUtility(IUserManager).get_address(email) + if not address: + print('No adddress to subscribe with.', file=results) + return ContinueProcessing.no + + pgp_address = PGPAddress.for_address(address) + if pgp_address is None: + print('A pgp enabled address not found.', file=results) + return ContinueProcessing.no + + token = arguments[1] + pendable = getUtility(IPendings).confirm(token, expunge=False) + if pendable is None: + print('Wrong token.', file=results) + return ContinueProcessing.no + + with transaction(): + pgp_address.key = keys.pop() + ISubscriptionManager(mlist).confirm(token) + + print('Key succesfully set.', file=results) + print('Key fingerprint: {}'.format(pgp_address.key.fingerprint), + file=results) + + return ContinueProcessing.no + + +def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): + if len(arguments) != 2: + print('Missing token.', file=results) + return ContinueProcessing.no + + email = _get_email(msg) + if not email: + print('No email to subscribe with.', file=results) + return ContinueProcessing.no + + pgp_address = PGPAddress.for_email(email) + if pgp_address is None: + print('A pgp enabled address not found.', file=results) + return ContinueProcessing.no + + if pgp_address.key is None: + print('No key set.', file=results) + return ContinueProcessing.no + + wrapped = PGPWrapper(msg) + if wrapped.is_encrypted(): + decrypted = wrapped.decrypt(pgp_list.key) + wrapped = PGPWrapper(decrypted) + + if not wrapped.is_signed(): + print('Message not signed, ignoring.', file=results) + return ContinueProcessing.no + + if not wrapped.verifies(pgp_address.key): + print('Message failed to verify.', file=results) + return ContinueProcessing.no + + token = arguments[1] + + pendable = getUtility(IPendings).confirm(token, expunge=False) + if pendable is None: + print('Wrong token.', file=results) + return ContinueProcessing.no + + if pendable.get('type') == KeyChangeWorkflow.pendable_class().PEND_TYPE: + expecting = CHANGE_CONFIRM_REQUEST.format(pendable.get('fingerprint'), + token) + else: + expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint, token) + + for sig_subject in wrapped.get_signed(): + if expecting in sig_subject: + ISubscriptionManager(mlist).confirm(token) + break + else: + print("Message doesn't contain the expected statement.", file=results) + return ContinueProcessing.no + + +def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results): + # New public key in attachment, requires to be signed with current + # key + if len(arguments) != 1: + print('Extraneous argument/s: ' + ','.join(arguments[1:]), + file=results) + return ContinueProcessing.no + + wrapped = PGPWrapper(msg) + if not wrapped.has_keys(): + print('No keys attached? Send a key.', file=results) + return ContinueProcessing.no + + keys = list(wrapped.keys()) + if len(keys) != 1: + print('More than one key! Send only one key.', file=results) + return ContinueProcessing.no + + email = _get_email(msg) + if not email: + print('No email to change key of.', file=results) + return ContinueProcessing.no + + pgp_address = PGPAddress.for_email(email) + if pgp_address is None: + print('A pgp enabled address not found.', file=results) + return ContinueProcessing.no + + workflow = KeyChangeWorkflow(mlist, pgp_address, keys.pop()) + list(workflow) + print('Key change request received.', file=results) + return ContinueProcessing.no + + +def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results): + # Current key revocation certificate in attachment, restarts the + # subscription process, or rather only it's key setup part. + pass + + +def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results): + # List public key attached, signed by the users current key. + pass + + +SUBCOMMANDS = { + 'set': _cmd_set, + 'confirm': _cmd_confirm, + 'change': _cmd_change, + 'revoke': _cmd_revoke, + 'sign': _cmd_sign +} + +ARGUMENTS = '<' + '|'.join(SUBCOMMANDS.keys()) + '>' + @public @implementer(IEmailCommand) @@ -28,28 +210,27 @@ class KeyCommand: """The `key` command.""" name = 'key' - argument_description = '<change|revoke|sign>' + argument_description = ARGUMENTS short_description = '' - description = '' + description = """\ + """ def process(self, mlist, msg, msgdata, arguments, results): """See `IEmailCommand`.""" if len(arguments) == 0: print('No sub-command specified,' - ' must be one of <change|revoke|sign>.', file=results) + ' must be one of {}.'.format(ARGUMENTS), file=results) return ContinueProcessing.no - if arguments[0] == 'change': - # New public key in attachment, requires to be signed with current - # key - pass - elif arguments[0] == 'revoke': - # Current key revocation certificate in attachment, restarts the - # subscription process, or rather only it's key setup part. - pass - elif arguments[0] == 'sign': - # List public key attached, signed by the users current key. - pass - else: + + if arguments[0] not in SUBCOMMANDS: print('Wrong sub-command specified,' - ' must be one of <change|revoke|sign>.', file=results) + ' must be one of {}.'.format(ARGUMENTS), file=results) return ContinueProcessing.no + + pgp_list = PGPMailingList.for_list(mlist) + if pgp_list is None: + print("This mailing list doesn't have pgp enabled.", file=results) + return ContinueProcessing.no + + command = SUBCOMMANDS[arguments[0]] + return command(pgp_list, mlist, msg, msgdata, arguments, results) diff --git a/src/mailman_pgp/commands/tests/__init__.py b/src/mailman_pgp/commands/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/mailman_pgp/commands/tests/__init__.py diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py new file mode 100644 index 0000000..d0ff7e9 --- /dev/null +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -0,0 +1,708 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" + +import unittest + +from mailman.app.lifecycle import create_list +from mailman.email.message import Message +from mailman.interfaces.subscriptions import ISubscriptionManager +from mailman.interfaces.usermanager import IUserManager +from mailman.runners.command import CommandRunner +from mailman.testing.helpers import get_queue_messages, make_testable_runner +from mailman.utilities.datetime import now +from public import public +from zope.component import getUtility + +from mailman_pgp.config import mm_config +from mailman_pgp.database import transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.mime import MIMEWrapper +from mailman_pgp.pgp.tests.base import load_key +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.workflows.base import CONFIRM_REQUEST +from mailman_pgp.workflows.key_change import CHANGE_CONFIRM_REQUEST +from mailman_pgp.workflows.subscription import OpenSubscriptionPolicy + + +def _create_plain(from_hdr, to_hdr, subject_hdr, payload): + message = Message() + message['From'] = from_hdr + message['To'] = to_hdr + message['Subject'] = subject_hdr + message.set_payload(payload) + return message + + +def _create_mixed(from_hdr, to_hdr, subject_hdr): + message = Message() + message['From'] = from_hdr + message['To'] = to_hdr + message['Subject'] = subject_hdr + message.set_type('multipart/mixed') + return message + + +@public +class TestPreDispatch(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + self.mlist = create_list('test@example.com') + + def test_no_arguments(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key', '') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No sub-command specified', results_msg.get_payload()) + + def test_wrong_subcommand(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key wrooooooong', '') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Wrong sub-command specified', results_msg.get_payload()) + + def test_no_pgp_list(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key set', '') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn("This mailing list doesn't have pgp enabled.", + results_msg.get_payload()) + + +@public +class TestPreSubscription(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + self.mlist = create_list('test@example.com', style_name='pgp-default') + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = load_key('ecc_p256.priv.asc') + + self.bart_key = load_key('rsa_1024.priv.asc') + self.anne_key = load_key('ecc_p256.priv.asc') + + def test_set(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart) + + get_queue_messages('virgin') + + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'Re: key set {}'.format(token)) + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + pgp_address = PGPAddress.for_address(bart) + self.assertIsNotNone(pgp_address) + self.assertEqual(pgp_address.key.fingerprint, + self.bart_key.fingerprint) + self.assertFalse(pgp_address.key_confirmed) + + items = get_queue_messages('virgin', expected_count=2) + if items[0].msg['Subject'] == 'The results of your email commands': + results = items[0].msg + confirm_request = items[1].msg + else: + results = items[1].msg + confirm_request = items[0].msg + + self.assertIn('Key succesfully set.', results.get_payload()) + self.assertIn('Key fingerprint: {}'.format(self.bart_key.fingerprint), + results.get_payload()) + + confirm_wrapped = PGPWrapper(confirm_request) + self.assertTrue(confirm_wrapped.is_encrypted()) + + def test_set_encrypted(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart) + + get_queue_messages('virgin') + + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'Re: key set {}'.format(token)) + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.encrypt(self.pgp_list.pubkey, + self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + pgp_address = PGPAddress.for_address(bart) + self.assertIsNotNone(pgp_address) + self.assertEqual(pgp_address.key.fingerprint, + self.bart_key.fingerprint) + self.assertFalse(pgp_address.key_confirmed) + + items = get_queue_messages('virgin', expected_count=2) + if items[0].msg['Subject'] == 'The results of your email commands': + results = items[0].msg + confirm_request = items[1].msg + else: + results = items[1].msg + confirm_request = items[0].msg + + self.assertIn('Key succesfully set.', results.get_payload()) + self.assertIn('Key fingerprint: {}'.format(self.bart_key.fingerprint), + results.get_payload()) + + confirm_wrapped = PGPWrapper(confirm_request) + self.assertTrue(confirm_wrapped.is_encrypted()) + + def test_set_no_token(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key set', '') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Missing token.', results_msg.get_payload()) + + def test_set_no_key(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key set token', '') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No keys attached? Send a key.', + results_msg.get_payload()) + + def test_set_multiple_keys(self): + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'Re: key set token') + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.anne_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('More than one key! Send only one key.', + results_msg.get_payload()) + + def test_set_no_email(self): + message = _create_mixed('', 'test@example.com', 'key set token') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_key(self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No email to subscribe with.', results_msg.get_payload()) + + def test_set_no_address(self): + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'key set token') + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No adddress to subscribe with.', + results_msg.get_payload()) + + def test_set_no_pgp_address(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'key set token') + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('A pgp enabled address not found.', + results_msg.get_payload()) + + def test_set_wrong_token(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + with transaction() as t: + pgp_address = PGPAddress(bart) + t.add(pgp_address) + + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'key set token') + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Wrong token.', results_msg.get_payload()) + + def test_confirm(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart, pubkey=self.bart_key.pubkey) + + get_queue_messages('virgin') + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm {}'.format(token), + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + token)) + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign(self.bart_key) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + pgp_address = PGPAddress.for_address(bart) + self.assertTrue(pgp_address.key_confirmed) + self.assertTrue(self.mlist.is_subscribed(bart)) + + def test_confirm_encrypted(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart, pubkey=self.bart_key.pubkey) + + get_queue_messages('virgin') + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm {}'.format(token), + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + token)) + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign_encrypt(self.bart_key, + self.pgp_list.pubkey, + self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + + make_testable_runner(CommandRunner, 'command').run() + + pgp_address = PGPAddress.for_address(bart) + self.assertTrue(pgp_address.key_confirmed) + self.assertTrue(self.mlist.is_subscribed(bart)) + + def test_confirm_no_token(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key confirm', '') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Missing token.', results_msg.get_payload()) + + def test_confirm_no_email(self): + message = _create_plain('', 'test@example.com', + 'key confirm token', '') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No email to subscribe with.', results_msg.get_payload()) + + def test_confirm_no_pgp_address(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key confirm token', '') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('A pgp enabled address not found.', + results_msg.get_payload()) + + def test_confirm_no_key(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + t.add(pgp_address) + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm token', + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + 'token')) + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign(self.bart_key) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No key set.', results_msg.get_payload()) + + def test_confirm_not_signed(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart, pubkey=self.bart_key.pubkey) + + get_queue_messages('virgin') + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm {}'.format(token), + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + token)) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Message not signed, ignoring.', + results_msg.get_payload()) + + def test_confirm_invalid_sig(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart, pubkey=self.bart_key.pubkey) + + get_queue_messages('virgin') + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm {}'.format(token), + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + token)) + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign(self.bart_key) + message.get_payload(0).set_payload( + 'Something that was definitely not signed.') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Message failed to verify.', + results_msg.get_payload()) + + def test_confirm_wrong_token(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + t.add(pgp_address) + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm token', + CONFIRM_REQUEST.format( + self.bart_key.fingerprint, + 'token')) + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign(self.bart_key) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Wrong token.', results_msg.get_payload()) + + def test_confirm_no_signed_statement(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart, pubkey=self.bart_key.pubkey) + + get_queue_messages('virgin') + + message = _create_plain('bart@example.com', 'test@example.com', + 'Re: key confirm {}'.format(token), + 'Some text, that definitely does not' + 'contain the required/expected statement.') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.sign(self.bart_key) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn("Message doesn't contain the expected statement.", + results_msg.get_payload()) + + +@public +class TestAfterSubscription(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + self.mlist = create_list('test@example.com', style_name='pgp-default') + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = load_key('ecc_p256.priv.asc') + + self.bart_key = load_key('rsa_1024.priv.asc') + self.bart_new_key = load_key('ecc_p256.priv.asc') + + def test_change(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key change') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_key(self.bart_new_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=2) + if items[0].msg['Subject'] == 'The results of your email commands': + results = items[0].msg + confirm_request = items[1].msg + else: + results = items[1].msg + confirm_request = items[0].msg + + self.assertIn('Key change request received.', results.get_payload()) + + confirm_wrapped = PGPWrapper(confirm_request) + self.assertTrue(confirm_wrapped.is_encrypted()) + decrypted = confirm_wrapped.decrypt(self.bart_new_key) + self.assertIn('key confirm', decrypted['subject']) + + def test_change_confirm(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key change') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_key(self.bart_new_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=2) + if items[0].msg['Subject'] == 'The results of your email commands': + confirm_request = items[1].msg + else: + confirm_request = items[0].msg + request_wrapped = PGPWrapper(confirm_request) + decrypted = request_wrapped.decrypt(self.bart_new_key) + + subj = decrypted['subject'] + token = subj.split(' ')[-1] + + confirm_message = _create_plain('bart@example.com', 'test@example.com', + decrypted['subject'], + CHANGE_CONFIRM_REQUEST.format( + self.bart_new_key.fingerprint, + token)) + wrapped_confirm = MIMEWrapper(confirm_message) + confirm = wrapped_confirm.sign(self.bart_key) + + mm_config.switchboards['command'].enqueue(confirm, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + pgp_address = PGPAddress.for_address(bart) + self.assertEqual(pgp_address.key_fingerprint, + self.bart_new_key.fingerprint) + self.assertTrue(pgp_address.key_confirmed) + + def test_change_extra_arg(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key change extra arguments', '') + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('Extraneous argument/s: extra,arguments', + results_msg.get_payload()) + + def test_change_no_key(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key change', '') + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No keys attached? Send a key.', + results_msg.get_payload()) + + def test_change_multiple_keys(self): + set_message = _create_mixed('bart@example.com', 'test@example.com', + 'key change') + + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_key.pubkey) + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(self.bart_new_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('More than one key! Send only one key.', + results_msg.get_payload()) + + def test_change_no_email(self): + message = _create_mixed('', 'test@example.com', 'key change') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_key(self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('No email to change key of.', results_msg.get_payload()) + + def test_change_no_pgp_address(self): + message = _create_mixed('bart@example.com', 'test@example.com', + 'key change') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_key(self.bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + items = get_queue_messages('virgin', expected_count=1) + results_msg = items[0].msg + + self.assertIn('A pgp enabled address not found.', + results_msg.get_payload()) diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py index 1d07486..25a87dc 100644 --- a/src/mailman_pgp/model/address.py +++ b/src/mailman_pgp/model/address.py @@ -22,7 +22,7 @@ from os.path import exists, isfile, join from mailman.database.types import SAUnicode from mailman.interfaces.usermanager import IUserManager from pgpy import PGPKey -from sqlalchemy import Column, Integer +from sqlalchemy import Boolean, Column, Integer from sqlalchemy.orm import reconstructor from zope.component import getUtility @@ -36,8 +36,9 @@ class PGPAddress(Base): __tablename__ = 'pgp_addresses' id = Column(Integer, primary_key=True) - email = Column(SAUnicode, index=True) + email = Column(SAUnicode, index=True, unique=True) key_fingerprint = Column(SAUnicode) + key_confirmed = Column(Boolean, default=False) def __init__(self, address): super().__init__() @@ -118,4 +119,10 @@ class PGPAddress(Base): """ if address is None: return None - return PGPAddress.query().filter_by(email=address.email).first() + return PGPAddress.for_email(address.email) + + @staticmethod + def for_email(email): + if email is None: + return None + return PGPAddress.query().filter_by(email=email).first() diff --git a/src/mailman_pgp/pgp/inline.py b/src/mailman_pgp/pgp/inline.py index 9c19030..458cc26 100644 --- a/src/mailman_pgp/pgp/inline.py +++ b/src/mailman_pgp/pgp/inline.py @@ -201,7 +201,10 @@ class InlineWrapper: def _decrypt(self, part, key): message = PGPMessage.from_blob(part.get_payload()) decrypted = key.decrypt(message) - part.set_payload(decrypted.message) + if decrypted.is_signed: + part.set_payload(str(decrypted)) + else: + part.set_payload(decrypted.message) def decrypt(self, key): """ diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py index afb1c35..a7e31b8 100644 --- a/src/mailman_pgp/pgp/mime.py +++ b/src/mailman_pgp/pgp/mime.py @@ -76,7 +76,8 @@ class MIMEWrapper: if not self._is_mime(): return False second_type = self.msg.get_payload(1).get_content_type() - protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol')) + protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol', + '')) content_subtype = self.msg.get_content_subtype() return (second_type == MIMEWrapper._signed_type and @@ -87,7 +88,7 @@ class MIMEWrapper: return self.is_signed() def get_signed(self): - yield self.msg.get_payload(0).as_string(0) + yield self.msg.get_payload(0).as_string() def is_encrypted(self): """ @@ -102,7 +103,8 @@ class MIMEWrapper: first_type = self.msg.get_payload(0).get_content_type() second_type = self.msg.get_payload(1).get_content_type() content_subtype = self.msg.get_content_subtype() - protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol')) + protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol', + '')) return ('Version: 1' in first_part and first_type == MIMEWrapper._encrypted_type and @@ -145,6 +147,26 @@ class MIMEWrapper: key, _ = PGPKey.from_blob(part.get_payload()) yield key + def attach_key(self, key): + """ + + :param key: + :type key: pgpy.PGPKey + :return: + """ + filename = '0x' + key.fingerprint.keyid + '.asc' + key_part = MIMEApplication(_data=str(key), + _subtype=MIMEWrapper._keys_subtype, + _encoder=encode_7or8bit, + name=filename) + key_part.add_header('Content-Description', + 'OpenPGP key') + key_part.add_header('Content-Disposition', 'attachment', + filename=filename) + out = copy.deepcopy(self.msg) + out.attach(key_part) + return out + def verify(self, key): """ Verify the signature of this message with key. @@ -181,9 +203,9 @@ class MIMEWrapper: _subtype=MIMEWrapper._signature_subtype, _encoder=encode_7or8bit, name='signature.asc') - second_part.add_header('content-description', + second_part.add_header('Content-Description', 'OpenPGP digital signature') - second_part.add_header('content-disposition', 'attachment', + second_part.add_header('Content-Disposition', 'attachment', filename='signature.asc') out.attach(copy.deepcopy(msg)) @@ -249,16 +271,16 @@ class MIMEWrapper: first_part = MIMEApplication(_data='Version: 1', _subtype=MIMEWrapper._encryption_subtype, _encoder=encode_7or8bit) - first_part.add_header('content-description', + first_part.add_header('Content-Description', 'PGP/MIME version identification') second_part = MIMEApplication(_data=str(payload), _subtype='octet-stream', _encoder=encode_7or8bit, name='encrypted.asc') - second_part.add_header('content-description', + second_part.add_header('Content-Description', 'OpenPGP encrypted message') - second_part.add_header('content-disposition', 'inline', + second_part.add_header('Content-Disposition', 'inline', filename='encrypted.asc') out.attach(first_part) out.attach(second_part) diff --git a/src/mailman_pgp/pgp/tests/base.py b/src/mailman_pgp/pgp/tests/base.py index d561fda..52e5ec4 100644 --- a/src/mailman_pgp/pgp/tests/base.py +++ b/src/mailman_pgp/pgp/tests/base.py @@ -42,63 +42,66 @@ def load_key(path): class WrapperTestCase(TestCase): wrapper = None + def wrap(self, message): + return self.wrapper(message) + def is_signed(self, message, signed): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) self.assertEqual(wrapped.is_signed(), signed) def sign(self, message, key): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) signed = wrapped.sign(key) - signed_wrapped = self.wrapper(signed) + signed_wrapped = self.wrap(signed) self.assertTrue(signed_wrapped.is_signed()) def sign_verify(self, message, priv, pub): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) signed = wrapped.sign(priv) - signed_wrapped = self.wrapper(signed) + signed_wrapped = self.wrap(signed) for signature in signed_wrapped.verify(pub): self.assertTrue(bool(signature)) def verify(self, message, key, valid): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) for signature in wrapped.verify(key): self.assertEqual(bool(signature), valid) def is_encrypted(self, message, encrypted): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) self.assertEqual(wrapped.is_encrypted(), encrypted) def encrypt(self, message, *keys, **kwargs): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) encrypted = wrapped.encrypt(*keys, **kwargs) - encrypted_wrapped = self.wrapper(encrypted) + encrypted_wrapped = self.wrap(encrypted) self.assertTrue(encrypted_wrapped.is_encrypted()) def encrypt_decrypt(self, message, pub, priv): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) encrypted = wrapped.encrypt(pub) - encrypted_wrapped = self.wrapper(encrypted) + encrypted_wrapped = self.wrap(encrypted) decrypted = encrypted_wrapped.decrypt(priv) - decrypted_wrapped = self.wrapper(decrypted) + decrypted_wrapped = self.wrap(decrypted) self.assertFalse(decrypted_wrapped.is_encrypted()) self.assertEqual(decrypted.get_payload(), message.get_payload()) def decrypt(self, message, key, clear): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) decrypted = wrapped.decrypt(key) - decrypted_wrapped = self.wrapper(decrypted) + decrypted_wrapped = self.wrap(decrypted) self.assertFalse(decrypted_wrapped.is_encrypted()) self.assertEqual(decrypted.get_payload(), clear) def has_keys(self, message, has_keys): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) self.assertEqual(wrapped.has_keys(), has_keys) def keys(self, message, keys): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) loaded = list(wrapped.keys()) self.assertEqual(len(loaded), len(keys)) @@ -107,13 +110,30 @@ class WrapperTestCase(TestCase): self.assertListEqual(loaded_fingerprints, fingerprints) def sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key): - wrapped = self.wrapper(message) + wrapped = self.wrap(message) encrypted = wrapped.sign_encrypt(sign_key, encrypt_key.pubkey) - encrypted_wrapped = self.wrapper(encrypted) + encrypted_wrapped = self.wrap(encrypted) + self.assertTrue(encrypted_wrapped.is_encrypted()) + + decrypted = encrypted_wrapped.decrypt(encrypt_key) + decrypted_wrapped = self.wrap(decrypted) + self.assertTrue(decrypted_wrapped.is_signed()) + self.assertFalse(decrypted_wrapped.is_encrypted()) + + verification = decrypted_wrapped.verify(sign_key.pubkey) + for sig in verification: + self.assertTrue(bool(sig)) + self.assertListEqual(list(decrypted_wrapped.get_signed()), + list(wrapped.get_payload())) + + def sign_then_encrypt_decrypt_verify(self, message, sign_key, encrypt_key): + wrapped = self.wrap(message) + encrypted = wrapped.sign_then_encrypt(sign_key, encrypt_key.pubkey) + encrypted_wrapped = self.wrap(encrypted) self.assertTrue(encrypted_wrapped.is_encrypted()) decrypted = encrypted_wrapped.decrypt(encrypt_key) - decrypted_wrapped = self.wrapper(decrypted) + decrypted_wrapped = self.wrap(decrypted) self.assertTrue(decrypted_wrapped.is_signed()) self.assertFalse(decrypted_wrapped.is_encrypted()) diff --git a/src/mailman_pgp/pgp/tests/test_inline.py b/src/mailman_pgp/pgp/tests/test_inline.py index cd18209..a89963a 100644 --- a/src/mailman_pgp/pgp/tests/test_inline.py +++ b/src/mailman_pgp/pgp/tests/test_inline.py @@ -18,6 +18,7 @@ """Tests for the inline wrapper.""" from parameterized import parameterized +from public import public from mailman_pgp.pgp.inline import InlineWrapper from mailman_pgp.pgp.tests.base import load_key, load_message, WrapperTestCase @@ -27,6 +28,7 @@ class InlineWrapperTestCase(WrapperTestCase): wrapper = InlineWrapper +@public class TestSigning(InlineWrapperTestCase): @parameterized.expand([ (load_message('inline_cleartext_signed.eml'), @@ -43,7 +45,7 @@ class TestSigning(InlineWrapperTestCase): False) ]) def test_is_signed(self, message, signed): - super().is_signed(message, signed) + self.is_signed(message, signed) @parameterized.expand([ (load_message('clear.eml'), @@ -52,7 +54,7 @@ class TestSigning(InlineWrapperTestCase): load_key('ecc_p256.priv.asc')) ]) def test_sign(self, message, key): - super().sign(message, key) + self.sign(message, key) @parameterized.expand([ (load_message('clear.eml'), @@ -63,7 +65,7 @@ class TestSigning(InlineWrapperTestCase): load_key('ecc_p256.pub.asc')) ]) def test_sign_verify(self, message, priv, pub): - super().sign_verify(message, priv, pub) + self.sign_verify(message, priv, pub) @parameterized.expand([ (load_message('inline_cleartext_signed.eml'), @@ -80,9 +82,10 @@ class TestSigning(InlineWrapperTestCase): False), ]) def test_verify(self, message, key, valid): - super().verify(message, key, valid) + self.verify(message, key, valid) +@public class TestEncryption(InlineWrapperTestCase): @parameterized.expand([ (load_message('inline_encrypted.eml'), @@ -99,7 +102,7 @@ class TestEncryption(InlineWrapperTestCase): False) ]) def test_is_encrypted(self, message, encrypted): - super().is_encrypted(message, encrypted) + self.is_encrypted(message, encrypted) @parameterized.expand([ (load_message('clear.eml'), @@ -110,9 +113,9 @@ class TestEncryption(InlineWrapperTestCase): ]) def test_encrypt(self, message, keys, **kwargs): if isinstance(keys, tuple): - super().encrypt(message, *keys, **kwargs) + self.encrypt(message, *keys, **kwargs) else: - super().encrypt(message, keys, **kwargs) + self.encrypt(message, keys, **kwargs) @parameterized.expand([ (load_message('clear.eml'), @@ -123,7 +126,7 @@ class TestEncryption(InlineWrapperTestCase): load_key('ecc_p256.priv.asc')) ]) def test_encrypt_decrypt(self, message, pub, priv): - super().encrypt_decrypt(message, pub, priv) + self.encrypt_decrypt(message, pub, priv) @parameterized.expand([ (load_message('inline_encrypted.eml'), @@ -131,9 +134,10 @@ class TestEncryption(InlineWrapperTestCase): 'Some encrypted text.\n\n') ]) def test_decrypt(self, message, key, clear): - super().decrypt(message, key, clear) + self.decrypt(message, key, clear) +@public class TestKeys(InlineWrapperTestCase): @parameterized.expand([ (load_message('inline_privkey.eml'), @@ -146,7 +150,7 @@ class TestKeys(InlineWrapperTestCase): False) ]) def test_has_keys(self, message, has_keys): - super().has_keys(message, has_keys) + self.has_keys(message, has_keys) @parameterized.expand([ (load_message('inline_privkey.eml'), @@ -155,4 +159,24 @@ class TestKeys(InlineWrapperTestCase): [load_key('rsa_1024.pub.asc')]) ]) def test_keys(self, message, keys): - super().keys(message, keys) + self.keys(message, keys) + + +@public +class TestCombined(InlineWrapperTestCase): + @parameterized.expand([ + (load_message('clear.eml'), + load_key('rsa_1024.priv.asc'), + load_key('ecc_p256.priv.asc')) + ]) + def test_sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key): + self.sign_encrypt_decrypt_verify(message, sign_key, encrypt_key) + + @parameterized.expand([ + (load_message('clear.eml'), + load_key('rsa_1024.priv.asc'), + load_key('ecc_p256.priv.asc')) + ]) + def test_sign_then_encrypt_decrypt_verify(self, message, sign_key, + encrypt_key): + self.sign_then_encrypt_decrypt_verify(message, sign_key, encrypt_key) diff --git a/src/mailman_pgp/pgp/tests/test_keygen.py b/src/mailman_pgp/pgp/tests/test_keygen.py index dab6801..5c59614 100644 --- a/src/mailman_pgp/pgp/tests/test_keygen.py +++ b/src/mailman_pgp/pgp/tests/test_keygen.py @@ -21,10 +21,12 @@ from unittest import TestCase from pgpy import PGPKey from pgpy.constants import PubKeyAlgorithm +from public import public from mailman_pgp.pgp.keygen import ListKeyGenerator +@public class TesKeygen(TestCase): def setUp(self): self.keypair_config = { diff --git a/src/mailman_pgp/pgp/tests/test_mime.py b/src/mailman_pgp/pgp/tests/test_mime.py index fab50bb..f56c781 100644 --- a/src/mailman_pgp/pgp/tests/test_mime.py +++ b/src/mailman_pgp/pgp/tests/test_mime.py @@ -18,6 +18,7 @@ """Tests for the MIME wrapper.""" from parameterized import parameterized +from public import public from mailman_pgp.pgp.mime import MIMEWrapper from mailman_pgp.pgp.tests.base import load_key, load_message, WrapperTestCase @@ -27,6 +28,7 @@ class MIMEWrapperTestCase(WrapperTestCase): wrapper = MIMEWrapper +@public class TestSigning(MIMEWrapperTestCase): @parameterized.expand([ (load_message('mime_signed.eml'), @@ -37,7 +39,7 @@ class TestSigning(MIMEWrapperTestCase): False) ]) def test_is_signed(self, message, signed): - super().is_signed(message, signed) + self.is_signed(message, signed) @parameterized.expand([ (load_message('clear.eml'), @@ -46,7 +48,7 @@ class TestSigning(MIMEWrapperTestCase): load_key('ecc_p256.priv.asc')) ]) def test_sign(self, message, key): - super().sign(message, key) + self.sign(message, key) @parameterized.expand([ (load_message('clear.eml'), @@ -57,7 +59,7 @@ class TestSigning(MIMEWrapperTestCase): load_key('ecc_p256.pub.asc')) ]) def test_sign_verify(self, message, priv, pub): - super().sign_verify(message, priv, pub) + self.sign_verify(message, priv, pub) @parameterized.expand([ (load_message('mime_signed.eml'), @@ -68,9 +70,10 @@ class TestSigning(MIMEWrapperTestCase): False) ]) def test_verify(self, message, key, valid): - super().verify(message, key, valid) + self.verify(message, key, valid) +@public class TestEncryption(MIMEWrapperTestCase): @parameterized.expand([ (load_message('mime_encrypted.eml'), @@ -79,7 +82,7 @@ class TestEncryption(MIMEWrapperTestCase): True) ]) def test_is_encrypted(self, message, encrypted): - super().is_encrypted(message, encrypted) + self.is_encrypted(message, encrypted) @parameterized.expand([ (load_message('clear.eml'), @@ -90,9 +93,9 @@ class TestEncryption(MIMEWrapperTestCase): ]) def test_encrypt(self, message, keys, **kwargs): if isinstance(keys, tuple): - super().encrypt(message, *keys, **kwargs) + self.encrypt(message, *keys, **kwargs) else: - super().encrypt(message, keys, **kwargs) + self.encrypt(message, keys, **kwargs) @parameterized.expand([ (load_message('clear.eml'), @@ -103,7 +106,7 @@ class TestEncryption(MIMEWrapperTestCase): load_key('ecc_p256.priv.asc')) ]) def test_encrypt_decrypt(self, message, pub, priv): - super().encrypt_decrypt(message, pub, priv) + self.encrypt_decrypt(message, pub, priv) @parameterized.expand([ (load_message('mime_encrypted.eml'), @@ -111,9 +114,10 @@ class TestEncryption(MIMEWrapperTestCase): 'Some encrypted text.\n') ]) def test_decrypt(self, message, key, clear): - super().decrypt(message, key, clear) + self.decrypt(message, key, clear) +@public class TestKeys(MIMEWrapperTestCase): @parameterized.expand([ (load_message('mime_privkey.eml'), @@ -126,7 +130,7 @@ class TestKeys(MIMEWrapperTestCase): False) ]) def test_has_keys(self, message, has_keys): - super().has_keys(message, has_keys) + self.has_keys(message, has_keys) @parameterized.expand([ (load_message('mime_privkey.eml'), @@ -135,9 +139,10 @@ class TestKeys(MIMEWrapperTestCase): [load_key('rsa_1024.pub.asc')]) ]) def test_keys(self, message, keys): - super().keys(message, keys) + self.keys(message, keys) +@public class TestCombined(MIMEWrapperTestCase): @parameterized.expand([ (load_message('clear.eml'), @@ -145,4 +150,13 @@ class TestCombined(MIMEWrapperTestCase): load_key('ecc_p256.priv.asc')) ]) def test_sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key): - super().sign_encrypt_decrypt_verify(message, sign_key, encrypt_key) + self.sign_encrypt_decrypt_verify(message, sign_key, encrypt_key) + + @parameterized.expand([ + (load_message('clear.eml'), + load_key('rsa_1024.priv.asc'), + load_key('ecc_p256.priv.asc')) + ]) + def test_sign_then_encrypt_decrypt_verify(self, message, sign_key, + encrypt_key): + self.sign_then_encrypt_decrypt_verify(message, sign_key, encrypt_key) diff --git a/src/mailman_pgp/pgp/tests/test_wrapper.py b/src/mailman_pgp/pgp/tests/test_wrapper.py index fb7f0bb..14f40fc 100644 --- a/src/mailman_pgp/pgp/tests/test_wrapper.py +++ b/src/mailman_pgp/pgp/tests/test_wrapper.py @@ -17,6 +17,7 @@ """Tests for the combined wrapper.""" from parameterized import parameterized +from public import public from mailman_pgp.pgp.tests.base import load_key, load_message, WrapperTestCase from mailman_pgp.pgp.wrapper import PGPWrapper @@ -26,6 +27,7 @@ class PGPWrapperTestCase(WrapperTestCase): wrapper = PGPWrapper +@public class TestSigning(PGPWrapperTestCase): @parameterized.expand([ (load_message('inline_cleartext_signed.eml'), @@ -46,7 +48,7 @@ class TestSigning(PGPWrapperTestCase): False) ]) def test_is_signed(self, message, signed): - super().is_signed(message, signed) + self.is_signed(message, signed) @parameterized.expand([ (load_message('clear.eml'), @@ -55,7 +57,7 @@ class TestSigning(PGPWrapperTestCase): load_key('ecc_p256.priv.asc')) ]) def test_sign(self, message, key): - super().sign(message, key) + self.sign(message, key) @parameterized.expand([ (load_message('inline_cleartext_signed.eml'), @@ -72,9 +74,10 @@ class TestSigning(PGPWrapperTestCase): False) ]) def test_verify(self, message, key, valid): - super().verify(message, key, valid) + self.verify(message, key, valid) +@public class TestEncryption(PGPWrapperTestCase): @parameterized.expand([ (load_message('inline_encrypted.eml'), @@ -93,7 +96,7 @@ class TestEncryption(PGPWrapperTestCase): False) ]) def test_is_encrypted(self, message, encrypted): - super().is_encrypted(message, encrypted) + self.is_encrypted(message, encrypted) @parameterized.expand([ (load_message('clear.eml'), @@ -104,9 +107,9 @@ class TestEncryption(PGPWrapperTestCase): ]) def test_encrypt(self, message, keys, **kwargs): if isinstance(keys, tuple): - super().encrypt(message, *keys, **kwargs) + self.encrypt(message, *keys, **kwargs) else: - super().encrypt(message, keys, **kwargs) + self.encrypt(message, keys, **kwargs) @parameterized.expand([ (load_message('clear.eml'), @@ -117,7 +120,7 @@ class TestEncryption(PGPWrapperTestCase): load_key('ecc_p256.priv.asc')) ]) def test_encrypt_decrypt(self, message, pub, priv): - super().encrypt_decrypt(message, pub, priv) + self.encrypt_decrypt(message, pub, priv) @parameterized.expand([ (load_message('inline_encrypted.eml'), @@ -125,9 +128,10 @@ class TestEncryption(PGPWrapperTestCase): 'Some encrypted text.\n\n') ]) def test_decrypt(self, message, key, clear): - super().decrypt(message, key, clear) + self.decrypt(message, key, clear) +@public class TestKeys(PGPWrapperTestCase): @parameterized.expand([ (load_message('inline_privkey.eml'), @@ -146,7 +150,7 @@ class TestKeys(PGPWrapperTestCase): False) ]) def test_has_keys(self, message, has_keys): - super().has_keys(message, has_keys) + self.has_keys(message, has_keys) @parameterized.expand([ (load_message('inline_privkey.eml'), @@ -159,4 +163,24 @@ class TestKeys(PGPWrapperTestCase): [load_key('rsa_1024.pub.asc')]) ]) def test_keys(self, message, keys): - super().keys(message, keys) + self.keys(message, keys) + + +@public +class TestCombined(PGPWrapperTestCase): + @parameterized.expand([ + (load_message('clear.eml'), + load_key('rsa_1024.priv.asc'), + load_key('ecc_p256.priv.asc')) + ]) + def test_sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key): + self.sign_encrypt_decrypt_verify(message, sign_key, encrypt_key) + + @parameterized.expand([ + (load_message('clear.eml'), + load_key('rsa_1024.priv.asc'), + load_key('ecc_p256.priv.asc')) + ]) + def test_sign_then_encrypt_decrypt_verify(self, message, sign_key, + encrypt_key): + self.sign_then_encrypt_decrypt_verify(message, sign_key, encrypt_key) diff --git a/src/mailman_pgp/pgp/utils.py b/src/mailman_pgp/pgp/utils.py index 0946e3f..a824138 100644 --- a/src/mailman_pgp/pgp/utils.py +++ b/src/mailman_pgp/pgp/utils.py @@ -18,7 +18,7 @@ """Various pgp and email utilities.""" -def copy_headers(from_msg, to_msg): +def copy_headers(from_msg, to_msg, overwrite=False): """ Copy the headers and unixfrom from a message to another one. @@ -28,6 +28,8 @@ def copy_headers(from_msg, to_msg): :type to_msg: email.message.Message """ for key, value in from_msg.items(): + if overwrite: + del to_msg[key] if key not in to_msg: to_msg[key] = value if to_msg.get_unixfrom() is None: diff --git a/src/mailman_pgp/pgp/wrapper.py b/src/mailman_pgp/pgp/wrapper.py index fcbec0e..6e8a8f9 100644 --- a/src/mailman_pgp/pgp/wrapper.py +++ b/src/mailman_pgp/pgp/wrapper.py @@ -80,7 +80,10 @@ class PGPWrapper(): yield from self.inline.verify(key) def verifies(self, key): - return all(bool(verification) for verification in self.verify(key)) + return all(bool(verification) and + all(not sigsubj.signature.is_expired + for sigsubj in verification.good_signatures) for + verification in self.verify(key)) def is_encrypted(self): return self.mime.is_encrypted() or self.inline.is_encrypted() diff --git a/src/mailman_pgp/rest/addresses.py b/src/mailman_pgp/rest/addresses.py index 4f7a04d..6d99a04 100644 --- a/src/mailman_pgp/rest/addresses.py +++ b/src/mailman_pgp/rest/addresses.py @@ -17,19 +17,44 @@ """""" -from mailman.rest.helpers import CollectionMixin +from mailman.rest.helpers import CollectionMixin, etag, not_found, okay from public.public import public +from mailman_pgp.config import config +from mailman_pgp.model.address import PGPAddress + class _EncryptedBase(CollectionMixin): - pass + def _resource_as_dict(self, address): + """See `CollectionMixin`.""" + return dict(email=address.email, + key_fingerprint=address.key_fingerprint, + key_confirmed=address.key_confirmed, + self_link=self.api.path_to( + '/plugins/{}/addresses/{}'.format(config.name, + address.email) + )) + + def _get_collection(self, request): + """See `CollectionMixin`.""" + return PGPAddress.query().all() @public -class AllAddresses: - pass +class AllAddresses(_EncryptedBase): + def on_get(self, request, response): + """/addresses""" + resource = self._make_collection(request) + return okay(response, etag(resource)) @public -class AnAddress: - pass +class AnAddress(_EncryptedBase): + def __init__(self, email): + self._address = PGPAddress.for_email(email) + + def on_get(self, request, response): + if self._address is None: + return not_found(response) + else: + okay(response, self._resource_as_json(self._address)) diff --git a/src/mailman_pgp/rest/lists.py b/src/mailman_pgp/rest/lists.py index 9141bce..37b8d28 100644 --- a/src/mailman_pgp/rest/lists.py +++ b/src/mailman_pgp/rest/lists.py @@ -73,7 +73,7 @@ class AnEncryptedList(_EncryptedBase): okay(response, self._resource_as_json(self._mlist)) @child() - def key(self, context, segments): + def pubkey(self, context, segments): return AListPubkey(self._mlist), [] diff --git a/src/mailman_pgp/rest/tests/test_addresses.py b/src/mailman_pgp/rest/tests/test_addresses.py new file mode 100644 index 0000000..1730618 --- /dev/null +++ b/src/mailman_pgp/rest/tests/test_addresses.py @@ -0,0 +1,60 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +import unittest +from urllib.error import HTTPError + +from mailman.interfaces.usermanager import IUserManager +from mailman.testing.helpers import call_api +from zope.component import getUtility + +from mailman_pgp.database import transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.testing.layers import PGPRESTLayer + + +class TestAddresses(unittest.TestCase): + layer = PGPRESTLayer + + def setUp(self): + self.mm_address = getUtility(IUserManager).create_address( + 'anne@example.com') + with transaction() as t: + self.address = PGPAddress(self.mm_address) + t.add(self.address) + + def test_missing_address(self): + with self.assertRaises(HTTPError) as cm: + call_api('http://localhost:9001/3.1/plugins/pgp/addresses/' + 'bart@example.com') + self.assertEqual(cm.exception.code, 404) + + def test_all_addresses(self): + json, response = call_api( + 'http://localhost:9001/3.1/plugins/pgp/addresses/') + self.assertEqual(json['total_size'], 1) + self.assertEqual(len(json['entries']), 1) + addresses = json['entries'] + address = addresses[0] + self.assertEqual(address['email'], self.address.email) + + def test_get_address(self): + json, response = call_api( + 'http://localhost:9001/3.1/plugins/pgp/addresses/' + 'anne@example.com') + self.assertEqual(json['email'], self.address.email) diff --git a/src/mailman_pgp/rest/tests/test_lists.py b/src/mailman_pgp/rest/tests/test_lists.py index a674cc9..c5c9854 100644 --- a/src/mailman_pgp/rest/tests/test_lists.py +++ b/src/mailman_pgp/rest/tests/test_lists.py @@ -68,7 +68,7 @@ class TestLists(TestCase): json, response = call_api( 'http://localhost:9001/3.1/plugins/pgp/lists/' - 'another.example.com/key') + 'another.example.com/pubkey') json.pop('http_etag') self.assertEqual(len(json.keys()), 2) diff --git a/src/mailman_pgp/styles/base.py b/src/mailman_pgp/styles/base.py index 6b5271b..a7c3366 100644 --- a/src/mailman_pgp/styles/base.py +++ b/src/mailman_pgp/styles/base.py @@ -19,7 +19,7 @@ from lazr.config import as_boolean from public import public -from mailman_pgp.config import config +from mailman_pgp.config import config, mm_config from mailman_pgp.database import transaction from mailman_pgp.model.list import PGPMailingList @@ -32,6 +32,11 @@ class PGPStyle: """ mailing_list.posting_chain = 'pgp-posting-chain' + old_policy = mailing_list.subscription_policy.name + new_policy_name = 'pgp-' + old_policy[4:] + if new_policy_name in mm_config.workflows: + mailing_list.subscription_policy = new_policy_name + pgp_list = PGPMailingList.for_list(mailing_list) if pgp_list: return diff --git a/src/mailman_pgp/styles/tests/test_base.py b/src/mailman_pgp/styles/tests/test_base.py index 159a242..d343fe4 100644 --- a/src/mailman_pgp/styles/tests/test_base.py +++ b/src/mailman_pgp/styles/tests/test_base.py @@ -33,8 +33,7 @@ class TestBaseStyle(TestCase): base_style = PGPStyle() base_style.apply(mlist) - pgp_list = PGPMailingList.query().filter_by( - list_id=mlist.list_id).first() + pgp_list = PGPMailingList.for_list(mlist) # Test that we have our PGPMailingList self.assertIsNotNone(pgp_list) diff --git a/src/mailman_pgp/testing/layers.py b/src/mailman_pgp/testing/layers.py index 5e0d8b3..fb8a3ec 100644 --- a/src/mailman_pgp/testing/layers.py +++ b/src/mailman_pgp/testing/layers.py @@ -14,6 +14,7 @@ # # You should have received a copy of the GNU General Public License along with # this program. If not, see <http://www.gnu.org/licenses/>. +import contextlib import os from os.path import isfile @@ -24,17 +25,36 @@ from mailman_pgp.database import transaction from mailman_pgp.model.base import Base -def reset_pgp_world(): +def reset_rollback(): + config.db.session.rollback() + + +def reset_pgp_dirs(): for keydir in (config.pgp.keydir_config.values()): for path in os.listdir(keydir): full_path = os.path.join(keydir, path) if isfile(full_path): os.remove(full_path) + + +def reset_pgp_hard(): + reset_rollback() + reset_pgp_dirs() with transaction(): Base.metadata.drop_all(config.db.engine) Base.metadata.create_all(config.db.engine) +def reset_pgp_soft(): + reset_rollback() + reset_pgp_dirs() + with contextlib.closing(config.db.engine.connect()) as con: + trans = con.begin() + for table in reversed(Base.metadata.sorted_tables): + con.execute(table.delete()) + trans.commit() + + # It's weird that ws have to do this, but for some reason nose2 test layers # don't work when ws create a mixin class with the two classmethods # and subclass both it and the respective Mailman Core test layer. @@ -45,28 +65,38 @@ class PGPConfigLayer(ConfigLayer): @classmethod def tearDown(cls): - reset_pgp_world() + reset_pgp_soft() + + @classmethod + def testTearDown(cls): + reset_pgp_soft() + + +class PGPMigrationLayer(ConfigLayer): + @classmethod + def tearDown(cls): + reset_pgp_hard() @classmethod def testTearDown(cls): - reset_pgp_world() + reset_pgp_hard() class PGPSMTPLayer(SMTPLayer): @classmethod def tearDown(cls): - reset_pgp_world() + reset_pgp_soft() @classmethod def testTearDown(cls): - reset_pgp_world() + reset_pgp_soft() class PGPRESTLayer(RESTLayer): @classmethod def tearDown(cls): - reset_pgp_world() + reset_pgp_soft() @classmethod def testTearDown(cls): - reset_pgp_world() + reset_pgp_soft() diff --git a/src/mailman_pgp/testing/start.py b/src/mailman_pgp/testing/start.py index 37f3de4..128855e 100644 --- a/src/mailman_pgp/testing/start.py +++ b/src/mailman_pgp/testing/start.py @@ -25,3 +25,5 @@ from public import public def start_run(plugin): warnings.filterwarnings(action='ignore', category=UserWarning, module='.*mailman_pgp.*') + warnings.filterwarnings(action='ignore', category=UserWarning, + module='.*PGPy.*') diff --git a/src/mailman_pgp/workflows/__init__.py b/src/mailman_pgp/workflows/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/mailman_pgp/workflows/__init__.py diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py new file mode 100644 index 0000000..d35e58c --- /dev/null +++ b/src/mailman_pgp/workflows/base.py @@ -0,0 +1,145 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +from mailman.email.message import UserNotification +from mailman.interfaces.subscriptions import TokenOwner +from pgpy import PGPKey + +from mailman_pgp.database import transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.utils import copy_headers +from mailman_pgp.pgp.wrapper import PGPWrapper + +KEY_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +We need your pubkey. +Reply to this message with it as a PGP/MIME(preferred) or inline. +----------""" + +CONFIRM_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +Reply to this message with this whole text +signed with your supplied key, either inline or PGP/MIME. + +Fingerprint: {} +Token: {} +---------- +""" + + +class PGPMixin: + def _step_pgp_prepare(self): + pgp_address = PGPAddress.for_address(self.address) + if pgp_address is None: + with transaction() as t: + pgp_address = PGPAddress(self.address) + t.add(pgp_address) + + +class SetPubkeyMixin: + def __init__(self, pubkey=None): + self.pubkey = pubkey + + @property + def pubkey_key(self): + if self.pubkey is None: + return None + return str(self.pubkey) + + @pubkey_key.setter + def pubkey_key(self, value): + if value is not None: + self.pubkey, _ = PGPKey.from_blob(value) + else: + self.pubkey = None + + def _step_pubkey_checks(self): + pgp_address = PGPAddress.for_address(self.address) + assert pgp_address is not None + + if self.pubkey is None: + if pgp_address.key is None: + self.push('send_key_request') + else: + with transaction(): + pgp_address.key = self.pubkey + + def _step_send_key_request(self): + self._set_token(TokenOwner.subscriber) + self.push('receive_key') + self.save() + request_address = self.mlist.request_address + email_address = self.address.email + msg = UserNotification(email_address, request_address, + 'key set {}'.format(self.token), + KEY_REQUEST) + msg.send(self.mlist, add_precedence=False) + # Now we wait for the confirmation. + raise StopIteration + + def _step_receive_key(self): + self._restore_subscriber() + self._set_token(TokenOwner.no_one) + + +class ConfirmPubkeyMixin: + def __init__(self, pre_confirmed=False): + self.pubkey_confirmed = pre_confirmed + + def _step_pubkey_confirmation(self): + pgp_address = PGPAddress.for_address(self.address) + assert pgp_address is not None + + if self.pubkey_confirmed: + with transaction(): + pgp_address.key_confirmed = True + else: + if not pgp_address.key_confirmed: + self.push('send_key_confirm_request') + + def _step_send_key_confirm_request(self): + self._set_token(TokenOwner.subscriber) + self.push('receive_key_confirmation') + self.save() + + pgp_address = PGPAddress.for_address(self.address) + request_address = self.mlist.request_address + email_address = self.address.email + msg = UserNotification(email_address, request_address, + 'key confirm {}'.format(self.token), + CONFIRM_REQUEST.format( + pgp_address.key_fingerprint, + self.token)) + pgp_list = PGPMailingList.for_list(self.mlist) + wrapped = PGPWrapper(msg) + encrypted = wrapped.sign_encrypt(pgp_list.key, pgp_address.key) + + msg.set_payload(encrypted.get_payload()) + copy_headers(encrypted, msg, True) + msg.send(self.mlist) + raise StopIteration + + def _step_receive_key_confirmation(self): + self._restore_subscriber() + self._set_token(TokenOwner.no_one) + with transaction(): + pgp_address = PGPAddress.for_address(self.address) + pgp_address.key_confirmed = True diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py new file mode 100644 index 0000000..cc5b9fc --- /dev/null +++ b/src/mailman_pgp/workflows/key_change.py @@ -0,0 +1,133 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +from mailman.email.message import UserNotification +from mailman.interfaces.pending import IPendable, IPendings +from mailman.interfaces.subscriptions import TokenOwner +from mailman.interfaces.workflows import IWorkflow +from mailman.workflows.base import Workflow +from pgpy import PGPKey +from public import public +from zope.component import getUtility +from zope.interface import implementer + +from mailman_pgp.database import transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.utils import copy_headers +from mailman_pgp.pgp.wrapper import PGPWrapper + +CHANGE_CONFIRM_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +You requested to change your key. +Reply to this message with this whole text +signed with your supplied key, either inline or PGP/MIME. + +Fingerprint: {} +Token: {} +---------- +""" + + +@public +@implementer(IWorkflow) +class KeyChangeWorkflow(Workflow): + name = 'pgp-key-change-workflow' + description = '' + initial_state = 'change_key' + save_attributes = ( + 'address_key', + 'pubkey_key' + ) + + def __init__(self, mlist, pgp_address=None, pubkey=None): + super().__init__() + self.mlist = mlist + self.pgp_list = PGPMailingList.for_list(mlist) + self.pgp_address = pgp_address + self.pubkey = pubkey + + @property + def address_key(self): + return self.pgp_address.email + + @address_key.setter + def address_key(self, value): + self.pgp_address = PGPAddress.for_email(value) + self.member = self.mlist.regular_members.get_member(value) + + @property + def pubkey_key(self): + return str(self.pubkey) + + @pubkey_key.setter + def pubkey_key(self, value): + self.pubkey, _ = PGPKey.from_blob(value) + + def _step_change_key(self): + if self.pgp_address is None or self.pubkey is None: + raise ValueError + + self.push('send_key_confirm_request') + + def _step_send_key_confirm_request(self): + pendings = getUtility(IPendings) + pendable = KeyChangeWorkflow.pendable_class()( + email=self.pgp_address.email, + pubkey=str(self.pubkey), + fingerprint=self.pubkey.fingerprint + ) + self.token = pendings.add(pendable) + self.token_owner = TokenOwner.subscriber + + self.push('receive_confirmation') + self.save() + request_address = self.mlist.request_address + email_address = self.pgp_address.email + msg = UserNotification(email_address, request_address, + 'key confirm {}'.format(self.token), + CHANGE_CONFIRM_REQUEST.format( + self.pubkey.fingerprint, + self.token)) + wrapped = PGPWrapper(msg) + encrypted = wrapped.sign_encrypt(self.pgp_list.key, self.pubkey) + + msg.set_payload(encrypted.get_payload()) + copy_headers(encrypted, msg, True) + msg.send(self.mlist) + raise StopIteration + + def _step_receive_confirmation(self): + with transaction(): + self.pgp_address.key = self.pubkey + self.pgp_address.key_confirmed = True + + pendings = getUtility(IPendings) + if self.token is not None: + pendings.confirm(self.token) + self.token = None + self.token_owner = TokenOwner.no_one + + @classmethod + def pendable_class(cls): + @implementer(IPendable) + class Pendable(dict): + PEND_TYPE = KeyChangeWorkflow.name + + return Pendable diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py new file mode 100644 index 0000000..ce02013 --- /dev/null +++ b/src/mailman_pgp/workflows/subscription.py @@ -0,0 +1,197 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" + +from mailman.core.i18n import _ +from mailman.interfaces.workflows import ISubscriptionWorkflow +from mailman.workflows.common import (ConfirmationMixin, ModerationMixin, + SubscriptionBase, VerificationMixin) +from public import public +from zope.interface import implementer + +from mailman_pgp.workflows.base import (ConfirmPubkeyMixin, PGPMixin, + SetPubkeyMixin) + + +@public +@implementer(ISubscriptionWorkflow) +class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin, + SetPubkeyMixin, ConfirmPubkeyMixin, + PGPMixin): + """""" + + name = 'pgp-policy-open' + description = _('An open subscription policy, ' + 'for a PGP-enabled mailing list.') + initial_state = 'prepare' + save_attributes = ( + 'verified', + 'pubkey_key', + 'pubkey_confirmed', + 'address_key', + 'subscriber_key', + 'user_key', + 'token_owner_key', + ) + + def __init__(self, mlist, subscriber=None, *, + pre_verified=False, pubkey=None, + pubkey_pre_confirmed=False): + SubscriptionBase.__init__(self, mlist, subscriber) + VerificationMixin.__init__(self, pre_verified=pre_verified) + SetPubkeyMixin.__init__(self, pubkey=pubkey) + ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) + PGPMixin.__init__(self) + + def _step_prepare(self): + self.push('do_subscription') + self.push('pubkey_confirmation') + self.push('pubkey_checks') + self.push('pgp_prepare') + self.push('verification_checks') + self.push('sanity_checks') + + +@public +@implementer(ISubscriptionWorkflow) +class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin, + ConfirmationMixin, SetPubkeyMixin, + ConfirmPubkeyMixin, PGPMixin): + """""" + + name = 'pgp-policy-confirm' + description = _('A subscription policy, for a PGP-enabled mailing list ' + 'that requires confirmation.') + initial_state = 'prepare' + save_attributes = ( + 'verified', + 'confirmed', + 'pubkey_key', + 'pubkey_confirmed', + 'address_key', + 'subscriber_key', + 'user_key', + 'token_owner_key', + ) + + def __init__(self, mlist, subscriber=None, *, + pre_verified=False, pre_confirmed=False, pubkey=None, + pubkey_pre_confirmed=False): + SubscriptionBase.__init__(self, mlist, subscriber) + VerificationMixin.__init__(self, pre_verified=pre_verified) + ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed) + SetPubkeyMixin.__init__(self, pubkey=pubkey) + ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) + PGPMixin.__init__(self) + + def _step_prepare(self): + self.push('do_subscription') + self.push('pubkey_confirmation') + self.push('pubkey_checks') + self.push('pgp_prepare') + self.push('confirmation_checks') + self.push('verification_checks') + self.push('sanity_checks') + + +@public +@implementer(ISubscriptionWorkflow) +class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, + ModerationMixin, SetPubkeyMixin, + ConfirmPubkeyMixin, PGPMixin): + """""" + + name = 'pgp-policy-moderate' + description = _('A subscription policy, for a PGP-enabled mailing list ' + 'that requires moderation.') + initial_state = 'prepare' + save_attributes = ( + 'verified', + 'approved', + 'pubkey_key', + 'pubkey_confirmed', + 'address_key', + 'subscriber_key', + 'user_key', + 'token_owner_key', + ) + + def __init__(self, mlist, subscriber=None, *, + pre_verified=False, pre_approved=False, pubkey=None, + pubkey_pre_confirmed=False): + SubscriptionBase.__init__(self, mlist, subscriber) + VerificationMixin.__init__(self, pre_verified=pre_verified) + ModerationMixin.__init__(self, pre_approved=pre_approved) + SetPubkeyMixin.__init__(self, pubkey=pubkey) + ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) + PGPMixin.__init__(self) + + def _step_prepare(self): + self.push('do_subscription') + self.push('moderation_checks') + self.push('pubkey_confirmation') + self.push('pubkey_checks') + self.push('pgp_prepare') + self.push('verification_checks') + self.push('sanity_checks') + + +@public +@implementer(ISubscriptionWorkflow) +class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin, + ConfirmationMixin, ModerationMixin, + SetPubkeyMixin, ConfirmPubkeyMixin, + PGPMixin): + """""" + + name = 'pgp-policy-confirm-moderate' + description = _('A subscription policy, for a PGP-enabled mailing list ' + 'that requires moderation after confirmation.') + initial_state = 'prepare' + save_attributes = ( + 'verified', + 'confirmed', + 'approved', + 'pubkey_key', + 'pubkey_confirmed', + 'address_key', + 'subscriber_key', + 'user_key', + 'token_owner_key', + ) + + def __init__(self, mlist, subscriber=None, *, + pre_verified=False, pre_confirmed=False, pre_approved=False, + pubkey=None, pubkey_pre_confirmed=False): + SubscriptionBase.__init__(self, mlist, subscriber) + VerificationMixin.__init__(self, pre_verified=pre_verified) + ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed) + ModerationMixin.__init__(self, pre_approved=pre_approved) + SetPubkeyMixin.__init__(self, pubkey=pubkey) + ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) + PGPMixin.__init__(self) + + def _step_prepare(self): + self.push('do_subscription') + self.push('moderation_checks') + self.push('pubkey_confirmation') + self.push('pubkey_checks') + self.push('pgp_prepare') + self.push('confirmation_checks') + self.push('verification_checks') + self.push('sanity_checks') diff --git a/src/mailman_pgp/workflows/tests/__init__.py b/src/mailman_pgp/workflows/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/mailman_pgp/workflows/tests/__init__.py diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py new file mode 100644 index 0000000..e3cc833 --- /dev/null +++ b/src/mailman_pgp/workflows/tests/test_base.py @@ -0,0 +1,265 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" + +import unittest +from contextlib import suppress +from unittest.mock import patch + +from mailman.app.lifecycle import create_list +from mailman.interfaces.pending import IPendings +from mailman.interfaces.usermanager import IUserManager +from mailman.interfaces.workflows import IWorkflow +from mailman.testing.helpers import get_queue_messages +from mailman.workflows.common import SubscriptionBase +from public import public +from zope.component import getUtility +from zope.interface import implementer + +from mailman_pgp.database import mm_transaction, transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.tests.base import load_key +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.workflows.base import (ConfirmPubkeyMixin, KEY_REQUEST, + PGPMixin, SetPubkeyMixin) + + +class PubkeyMixinTestSetup(): + def setUp(self): + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + + self.list_key = load_key('ecc_p256.priv.asc') + + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = self.list_key + + self.um = getUtility(IUserManager) + + self.sender_key = load_key('rsa_1024.priv.asc') + self.sender = self.um.create_address('rsa-1024b@example.org') + + +@implementer(IWorkflow) +class PGPTestWorkflow(SubscriptionBase, PGPMixin, SetPubkeyMixin, + ConfirmPubkeyMixin): + name = 'test-workflow' + description = '' + initial_state = 'prepare' + save_attributes = ( + 'pubkey_key', + 'pubkey_confirmed', + 'address_key', + 'subscriber_key', + 'user_key', + 'token_owner_key' + ) + + def __init__(self, mlist, subscriber=None, *, pubkey=None, + pubkey_pre_confirmed=False): + SubscriptionBase.__init__(self, mlist, subscriber) + SetPubkeyMixin.__init__(self, pubkey=pubkey) + ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed) + PGPMixin.__init__(self) + + def _step_prepare(self): + self.push('do_subscription') + self.push('pubkey_confirmation') + self.push('pubkey_checks') + self.push('pgp_prepare') + self.push('sanity_checks') + + +@public +class TestPGPMixin(PubkeyMixinTestSetup, unittest.TestCase): + layer = PGPConfigLayer + + def test_create_address(self): + workflow = PGPTestWorkflow(self.mlist, self.sender) + workflow.run_thru('pgp_prepare') + pgp_address = PGPAddress.for_address(self.sender) + self.assertIsNotNone(pgp_address) + + def test_address_existing(self): + workflow = PGPTestWorkflow(self.mlist, self.sender) + with transaction() as t: + pgp_address = PGPAddress(self.sender) + t.add(pgp_address) + workflow.run_thru('pgp_prepare') + still = PGPAddress.for_address(self.sender) + self.assertIsNotNone(still) + + +@public +class TestSetPubkeyMixin(PubkeyMixinTestSetup, unittest.TestCase): + layer = PGPConfigLayer + + def test_key_request_sent(self): + workflow = PGPTestWorkflow(self.mlist, self.sender) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + token = workflow.token + + self.assertEqual(message['Subject'], 'key set {}'.format(token)) + self.assertEqual(message.get_payload(), KEY_REQUEST) + + def test_receive_key(self): + workflow = PGPTestWorkflow(self.mlist, self.sender) + list(workflow) + with transaction(): + pgp_address = PGPAddress.for_address(self.sender) + pgp_address.key = self.sender_key.pubkey + + receive_workflow = PGPTestWorkflow(self.mlist) + receive_workflow.token = workflow.token + receive_workflow.restore() + receive_workflow.run_thru('receive_key') + + def test_set_pubkey(self): + workflow = PGPTestWorkflow(self.mlist, self.sender, + pubkey=self.sender_key.pubkey) + workflow.run_thru('pubkey_checks') + pgp_address = PGPAddress.for_address(self.sender) + self.assertIsNotNone(pgp_address) + self.assertIsNotNone(pgp_address.key) + self.assertEqual(pgp_address.key_fingerprint, + self.sender_key.fingerprint) + + def test_pubkey_set(self): + workflow = PGPTestWorkflow(self.mlist, self.sender) + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + t.add(pgp_address) + workflow.run_thru('pubkey_checks') + self.assertEqual(pgp_address.key_fingerprint, + self.sender_key.fingerprint) + + +@public +class TestConfirmPubkeyMixin(PubkeyMixinTestSetup, unittest.TestCase): + layer = PGPConfigLayer + + def test_key_request_pubkey_set(self): + workflow = PGPTestWorkflow(self.mlist, self.sender, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=True) + workflow.run_thru('pubkey_confirmation') + with patch.object(workflow, '_step_do_subscription') as step: + next(workflow) + step.assert_called_once_with() + + def test_send_key_confirm_request(self): + workflow = PGPTestWorkflow(self.mlist, self.sender, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=False) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + token = workflow.token + + self.assertEqual(message['Subject'], 'key confirm {}'.format(token)) + wrapped = PGPWrapper(message) + self.assertTrue(wrapped.is_encrypted()) + + def test_receive_confirmation(self): + workflow = PGPTestWorkflow(self.mlist, self.sender, + pubkey=self.sender_key.pubkey, + pubkey_pre_confirmed=False) + list(workflow) + + receive_workflow = PGPTestWorkflow(self.mlist) + receive_workflow.token = workflow.token + receive_workflow.restore() + receive_workflow.run_thru('receive_key_confirmation') + with patch.object(receive_workflow, '_step_do_subscription') as step: + next(receive_workflow) + step.assert_called_once_with() + + +@public +class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase): + layer = PGPConfigLayer + + def test_pended_data_key_request(self): + workflow = PGPTestWorkflow(self.mlist, self.sender) + with suppress(StopIteration): + workflow.run_thru('send_key_request') + self.assertIsNotNone(workflow.token) + pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) + self.assertEqual(pendable['list_id'], 'test.example.com') + self.assertEqual(pendable['email'], 'rsa-1024b@example.org') + self.assertEqual(pendable['display_name'], '') + self.assertEqual(pendable['when'], '2005-08-01T07:49:23') + self.assertEqual(pendable['token_owner'], 'subscriber') + + def test_pended_data_key_confirmation(self): + workflow = PGPTestWorkflow(self.mlist, self.sender, + pubkey=self.sender_key.pubkey) + with suppress(StopIteration): + workflow.run_thru('send_key_confirm_request') + self.assertIsNotNone(workflow.token) + pendable = getUtility(IPendings).confirm(workflow.token, expunge=False) + self.assertEqual(pendable['list_id'], 'test.example.com') + self.assertEqual(pendable['email'], 'rsa-1024b@example.org') + self.assertEqual(pendable['display_name'], '') + self.assertEqual(pendable['when'], '2005-08-01T07:49:23') + self.assertEqual(pendable['token_owner'], 'subscriber') + + def test_exisitng_pgp_address(self): + workflow = PGPTestWorkflow(self.mlist, self.sender) + + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow.run_thru('pubkey_confirmation') + with patch.object(workflow, '_step_do_subscription') as step: + next(workflow) + step.assert_called_once_with() + + def test_exisitng_pgp_address_not_confirmed(self): + workflow = PGPTestWorkflow(self.mlist, self.sender) + + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + t.add(pgp_address) + + workflow.run_thru('pubkey_confirmation') + with patch.object(workflow, '_step_send_key_confirm_request') as step: + next(workflow) + step.assert_called_once_with() + + def test_exisitng_pgp_address_no_key(self): + workflow = PGPTestWorkflow(self.mlist, self.sender) + + with transaction() as t: + pgp_address = PGPAddress(self.sender) + t.add(pgp_address) + + workflow.run_thru('pubkey_checks') + with patch.object(workflow, '_step_send_key_request') as step: + next(workflow) + step.assert_called_once_with() diff --git a/src/mailman_pgp/workflows/tests/test_key_change.py b/src/mailman_pgp/workflows/tests/test_key_change.py new file mode 100644 index 0000000..5d61efd --- /dev/null +++ b/src/mailman_pgp/workflows/tests/test_key_change.py @@ -0,0 +1,105 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" + +import unittest + +from mailman.app.lifecycle import create_list +from mailman.interfaces.subscriptions import ISubscriptionManager, TokenOwner +from mailman.interfaces.usermanager import IUserManager +from mailman.testing.helpers import get_queue_messages +from public import public +from zope.component import getUtility + +from mailman_pgp.database import mm_transaction, transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.tests.base import load_key +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.workflows.key_change import KeyChangeWorkflow + + +@public +class TestKeyChangeWorkflow(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = load_key('ecc_p256.priv.asc') + + self.sender_key = load_key('rsa_1024.priv.asc') + self.sender_new_key = load_key('ecc_p256.priv.asc') + self.sender = getUtility(IUserManager).create_address( + 'rsa-1024b@example.org') + + def test_pgp_address_none(self): + workflow = KeyChangeWorkflow(self.mlist) + with self.assertRaises(ValueError): + list(workflow) + + def test_pubkey_none(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + t.add(pgp_address) + + workflow = KeyChangeWorkflow(self.mlist, pgp_address) + with self.assertRaises(ValueError): + list(workflow) + + def test_send_key_confirm_request(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow = KeyChangeWorkflow(self.mlist, pgp_address, + self.sender_new_key.pubkey) + list(workflow) + items = get_queue_messages('virgin', expected_count=1) + message = items[0].msg + token = workflow.token + + self.assertEqual(message['Subject'], 'key confirm {}'.format(token)) + wrapped = PGPWrapper(message) + self.assertTrue(wrapped.is_encrypted()) + + def test_confirm(self): + with transaction() as t: + pgp_address = PGPAddress(self.sender) + pgp_address.key = self.sender_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + workflow = KeyChangeWorkflow(self.mlist, pgp_address, + self.sender_new_key.pubkey) + list(workflow) + + token, token_owner, member = ISubscriptionManager(self.mlist).confirm( + workflow.token) + self.assertIsNone(token) + self.assertEqual(token_owner, TokenOwner.no_one) + + pgp_address = PGPAddress.for_address(self.sender) + self.assertEqual(pgp_address.key_fingerprint, + self.sender_new_key.fingerprint) + self.assertTrue(pgp_address.key_confirmed) diff --git a/src/mailman_pgp/workflows/tests/test_subscription.py b/src/mailman_pgp/workflows/tests/test_subscription.py new file mode 100644 index 0000000..f9fa1e1 --- /dev/null +++ b/src/mailman_pgp/workflows/tests/test_subscription.py @@ -0,0 +1,58 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +import unittest + +from mailman.app.lifecycle import create_list +from mailman.interfaces.usermanager import IUserManager +from public import public +from zope.component import getUtility + +from mailman_pgp.database import mm_transaction +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.workflows.subscription import ( + ConfirmModerationSubscriptionPolicy, ConfirmSubscriptionPolicy, + ModerationSubscriptionPolicy, OpenSubscriptionPolicy) + + +@public +class TestSubscriptionWorkflows(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + self.sender = getUtility(IUserManager).create_address( + 'rsa-1024b@example.org') + + def test_open_policy(self): + workflow = OpenSubscriptionPolicy(self.mlist, self.sender) + next(workflow) + + def test_confirm_policy(self): + workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender) + next(workflow) + + def test_moderation_policy(self): + workflow = ModerationSubscriptionPolicy(self.mlist, self.sender) + next(workflow) + + def test_confirm_moderation_policy(self): + workflow = ConfirmModerationSubscriptionPolicy(self.mlist, self.sender) + next(workflow) |
