aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJ08nY2017-07-14 22:04:08 +0200
committerJ08nY2017-07-14 22:04:08 +0200
commit0129c437289ac0501c26184a983eab9ef14419c9 (patch)
treef6550e45575cf9c736d2f2fcdc27e774080cbbd9
parent2b6d4eca265617d1acfbbc790350055af86268dd (diff)
parent59ddda945ecf7de859f7976c0cea32dc6434b98e (diff)
downloadmailman-pgp-0129c437289ac0501c26184a983eab9ef14419c9.tar.gz
mailman-pgp-0129c437289ac0501c26184a983eab9ef14419c9.tar.zst
mailman-pgp-0129c437289ac0501c26184a983eab9ef14419c9.zip
-rw-r--r--src/mailman_pgp/commands/eml_key.py213
-rw-r--r--src/mailman_pgp/commands/tests/__init__.py0
-rw-r--r--src/mailman_pgp/commands/tests/test_key.py708
-rw-r--r--src/mailman_pgp/model/address.py13
-rw-r--r--src/mailman_pgp/pgp/inline.py5
-rw-r--r--src/mailman_pgp/pgp/mime.py38
-rw-r--r--src/mailman_pgp/pgp/tests/base.py58
-rw-r--r--src/mailman_pgp/pgp/tests/test_inline.py46
-rw-r--r--src/mailman_pgp/pgp/tests/test_keygen.py2
-rw-r--r--src/mailman_pgp/pgp/tests/test_mime.py38
-rw-r--r--src/mailman_pgp/pgp/tests/test_wrapper.py44
-rw-r--r--src/mailman_pgp/pgp/utils.py4
-rw-r--r--src/mailman_pgp/pgp/wrapper.py5
-rw-r--r--src/mailman_pgp/rest/addresses.py37
-rw-r--r--src/mailman_pgp/rest/lists.py2
-rw-r--r--src/mailman_pgp/rest/tests/test_addresses.py60
-rw-r--r--src/mailman_pgp/rest/tests/test_lists.py2
-rw-r--r--src/mailman_pgp/styles/base.py7
-rw-r--r--src/mailman_pgp/styles/tests/test_base.py3
-rw-r--r--src/mailman_pgp/testing/layers.py44
-rw-r--r--src/mailman_pgp/testing/start.py2
-rw-r--r--src/mailman_pgp/workflows/__init__.py0
-rw-r--r--src/mailman_pgp/workflows/base.py145
-rw-r--r--src/mailman_pgp/workflows/key_change.py133
-rw-r--r--src/mailman_pgp/workflows/subscription.py197
-rw-r--r--src/mailman_pgp/workflows/tests/__init__.py0
-rw-r--r--src/mailman_pgp/workflows/tests/test_base.py265
-rw-r--r--src/mailman_pgp/workflows/tests/test_key_change.py105
-rw-r--r--src/mailman_pgp/workflows/tests/test_subscription.py58
29 files changed, 2134 insertions, 100 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index 9514f48..1b6dc9f 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -16,11 +16,193 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
"""The key email command."""
+from email.utils import parseaddr
from mailman.interfaces.command import ContinueProcessing, IEmailCommand
+from mailman.interfaces.pending import IPendings
+from mailman.interfaces.subscriptions import ISubscriptionManager
+from mailman.interfaces.usermanager import IUserManager
from public import public
+from zope.component import getUtility
from zope.interface import implementer
+from mailman_pgp.database import transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.wrapper import PGPWrapper
+from mailman_pgp.workflows.base import CONFIRM_REQUEST
+from mailman_pgp.workflows.key_change import (CHANGE_CONFIRM_REQUEST,
+ KeyChangeWorkflow)
+
+
+def _get_email(msg):
+ display_name, email = parseaddr(msg['from'])
+ # Address could be None or the empty string.
+ if not email:
+ email = msg.sender
+ return email
+
+
+def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results):
+ if len(arguments) != 2:
+ print('Missing token.', file=results)
+ return ContinueProcessing.no
+
+ wrapped = PGPWrapper(msg)
+ if wrapped.is_encrypted():
+ decrypted = wrapped.decrypt(pgp_list.key)
+ wrapped = PGPWrapper(decrypted)
+
+ if not wrapped.has_keys():
+ print('No keys attached? Send a key.', file=results)
+ return ContinueProcessing.no
+
+ keys = list(wrapped.keys())
+ if len(keys) != 1:
+ print('More than one key! Send only one key.', file=results)
+ return ContinueProcessing.no
+
+ email = _get_email(msg)
+ if not email:
+ print('No email to subscribe with.', file=results)
+ return ContinueProcessing.no
+
+ address = getUtility(IUserManager).get_address(email)
+ if not address:
+ print('No adddress to subscribe with.', file=results)
+ return ContinueProcessing.no
+
+ pgp_address = PGPAddress.for_address(address)
+ if pgp_address is None:
+ print('A pgp enabled address not found.', file=results)
+ return ContinueProcessing.no
+
+ token = arguments[1]
+ pendable = getUtility(IPendings).confirm(token, expunge=False)
+ if pendable is None:
+ print('Wrong token.', file=results)
+ return ContinueProcessing.no
+
+ with transaction():
+ pgp_address.key = keys.pop()
+ ISubscriptionManager(mlist).confirm(token)
+
+ print('Key succesfully set.', file=results)
+ print('Key fingerprint: {}'.format(pgp_address.key.fingerprint),
+ file=results)
+
+ return ContinueProcessing.no
+
+
+def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
+ if len(arguments) != 2:
+ print('Missing token.', file=results)
+ return ContinueProcessing.no
+
+ email = _get_email(msg)
+ if not email:
+ print('No email to subscribe with.', file=results)
+ return ContinueProcessing.no
+
+ pgp_address = PGPAddress.for_email(email)
+ if pgp_address is None:
+ print('A pgp enabled address not found.', file=results)
+ return ContinueProcessing.no
+
+ if pgp_address.key is None:
+ print('No key set.', file=results)
+ return ContinueProcessing.no
+
+ wrapped = PGPWrapper(msg)
+ if wrapped.is_encrypted():
+ decrypted = wrapped.decrypt(pgp_list.key)
+ wrapped = PGPWrapper(decrypted)
+
+ if not wrapped.is_signed():
+ print('Message not signed, ignoring.', file=results)
+ return ContinueProcessing.no
+
+ if not wrapped.verifies(pgp_address.key):
+ print('Message failed to verify.', file=results)
+ return ContinueProcessing.no
+
+ token = arguments[1]
+
+ pendable = getUtility(IPendings).confirm(token, expunge=False)
+ if pendable is None:
+ print('Wrong token.', file=results)
+ return ContinueProcessing.no
+
+ if pendable.get('type') == KeyChangeWorkflow.pendable_class().PEND_TYPE:
+ expecting = CHANGE_CONFIRM_REQUEST.format(pendable.get('fingerprint'),
+ token)
+ else:
+ expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint, token)
+
+ for sig_subject in wrapped.get_signed():
+ if expecting in sig_subject:
+ ISubscriptionManager(mlist).confirm(token)
+ break
+ else:
+ print("Message doesn't contain the expected statement.", file=results)
+ return ContinueProcessing.no
+
+
+def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results):
+ # New public key in attachment, requires to be signed with current
+ # key
+ if len(arguments) != 1:
+ print('Extraneous argument/s: ' + ','.join(arguments[1:]),
+ file=results)
+ return ContinueProcessing.no
+
+ wrapped = PGPWrapper(msg)
+ if not wrapped.has_keys():
+ print('No keys attached? Send a key.', file=results)
+ return ContinueProcessing.no
+
+ keys = list(wrapped.keys())
+ if len(keys) != 1:
+ print('More than one key! Send only one key.', file=results)
+ return ContinueProcessing.no
+
+ email = _get_email(msg)
+ if not email:
+ print('No email to change key of.', file=results)
+ return ContinueProcessing.no
+
+ pgp_address = PGPAddress.for_email(email)
+ if pgp_address is None:
+ print('A pgp enabled address not found.', file=results)
+ return ContinueProcessing.no
+
+ workflow = KeyChangeWorkflow(mlist, pgp_address, keys.pop())
+ list(workflow)
+ print('Key change request received.', file=results)
+ return ContinueProcessing.no
+
+
+def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results):
+ # Current key revocation certificate in attachment, restarts the
+ # subscription process, or rather only it's key setup part.
+ pass
+
+
+def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results):
+ # List public key attached, signed by the users current key.
+ pass
+
+
+SUBCOMMANDS = {
+ 'set': _cmd_set,
+ 'confirm': _cmd_confirm,
+ 'change': _cmd_change,
+ 'revoke': _cmd_revoke,
+ 'sign': _cmd_sign
+}
+
+ARGUMENTS = '<' + '|'.join(SUBCOMMANDS.keys()) + '>'
+
@public
@implementer(IEmailCommand)
@@ -28,28 +210,27 @@ class KeyCommand:
"""The `key` command."""
name = 'key'
- argument_description = '<change|revoke|sign>'
+ argument_description = ARGUMENTS
short_description = ''
- description = ''
+ description = """\
+ """
def process(self, mlist, msg, msgdata, arguments, results):
"""See `IEmailCommand`."""
if len(arguments) == 0:
print('No sub-command specified,'
- ' must be one of <change|revoke|sign>.', file=results)
+ ' must be one of {}.'.format(ARGUMENTS), file=results)
return ContinueProcessing.no
- if arguments[0] == 'change':
- # New public key in attachment, requires to be signed with current
- # key
- pass
- elif arguments[0] == 'revoke':
- # Current key revocation certificate in attachment, restarts the
- # subscription process, or rather only it's key setup part.
- pass
- elif arguments[0] == 'sign':
- # List public key attached, signed by the users current key.
- pass
- else:
+
+ if arguments[0] not in SUBCOMMANDS:
print('Wrong sub-command specified,'
- ' must be one of <change|revoke|sign>.', file=results)
+ ' must be one of {}.'.format(ARGUMENTS), file=results)
return ContinueProcessing.no
+
+ pgp_list = PGPMailingList.for_list(mlist)
+ if pgp_list is None:
+ print("This mailing list doesn't have pgp enabled.", file=results)
+ return ContinueProcessing.no
+
+ command = SUBCOMMANDS[arguments[0]]
+ return command(pgp_list, mlist, msg, msgdata, arguments, results)
diff --git a/src/mailman_pgp/commands/tests/__init__.py b/src/mailman_pgp/commands/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/mailman_pgp/commands/tests/__init__.py
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
new file mode 100644
index 0000000..d0ff7e9
--- /dev/null
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -0,0 +1,708 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+
+import unittest
+
+from mailman.app.lifecycle import create_list
+from mailman.email.message import Message
+from mailman.interfaces.subscriptions import ISubscriptionManager
+from mailman.interfaces.usermanager import IUserManager
+from mailman.runners.command import CommandRunner
+from mailman.testing.helpers import get_queue_messages, make_testable_runner
+from mailman.utilities.datetime import now
+from public import public
+from zope.component import getUtility
+
+from mailman_pgp.config import mm_config
+from mailman_pgp.database import transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.mime import MIMEWrapper
+from mailman_pgp.pgp.tests.base import load_key
+from mailman_pgp.pgp.wrapper import PGPWrapper
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.workflows.base import CONFIRM_REQUEST
+from mailman_pgp.workflows.key_change import CHANGE_CONFIRM_REQUEST
+from mailman_pgp.workflows.subscription import OpenSubscriptionPolicy
+
+
+def _create_plain(from_hdr, to_hdr, subject_hdr, payload):
+ message = Message()
+ message['From'] = from_hdr
+ message['To'] = to_hdr
+ message['Subject'] = subject_hdr
+ message.set_payload(payload)
+ return message
+
+
+def _create_mixed(from_hdr, to_hdr, subject_hdr):
+ message = Message()
+ message['From'] = from_hdr
+ message['To'] = to_hdr
+ message['Subject'] = subject_hdr
+ message.set_type('multipart/mixed')
+ return message
+
+
+@public
+class TestPreDispatch(unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ self.mlist = create_list('test@example.com')
+
+ def test_no_arguments(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key', '')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No sub-command specified', results_msg.get_payload())
+
+ def test_wrong_subcommand(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key wrooooooong', '')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Wrong sub-command specified', results_msg.get_payload())
+
+ def test_no_pgp_list(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key set', '')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn("This mailing list doesn't have pgp enabled.",
+ results_msg.get_payload())
+
+
+@public
+class TestPreSubscription(unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ self.mlist = create_list('test@example.com', style_name='pgp-default')
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = load_key('ecc_p256.priv.asc')
+
+ self.bart_key = load_key('rsa_1024.priv.asc')
+ self.anne_key = load_key('ecc_p256.priv.asc')
+
+ def test_set(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart)
+
+ get_queue_messages('virgin')
+
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'Re: key set {}'.format(token))
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ pgp_address = PGPAddress.for_address(bart)
+ self.assertIsNotNone(pgp_address)
+ self.assertEqual(pgp_address.key.fingerprint,
+ self.bart_key.fingerprint)
+ self.assertFalse(pgp_address.key_confirmed)
+
+ items = get_queue_messages('virgin', expected_count=2)
+ if items[0].msg['Subject'] == 'The results of your email commands':
+ results = items[0].msg
+ confirm_request = items[1].msg
+ else:
+ results = items[1].msg
+ confirm_request = items[0].msg
+
+ self.assertIn('Key succesfully set.', results.get_payload())
+ self.assertIn('Key fingerprint: {}'.format(self.bart_key.fingerprint),
+ results.get_payload())
+
+ confirm_wrapped = PGPWrapper(confirm_request)
+ self.assertTrue(confirm_wrapped.is_encrypted())
+
+ def test_set_encrypted(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart)
+
+ get_queue_messages('virgin')
+
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'Re: key set {}'.format(token))
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.encrypt(self.pgp_list.pubkey,
+ self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ pgp_address = PGPAddress.for_address(bart)
+ self.assertIsNotNone(pgp_address)
+ self.assertEqual(pgp_address.key.fingerprint,
+ self.bart_key.fingerprint)
+ self.assertFalse(pgp_address.key_confirmed)
+
+ items = get_queue_messages('virgin', expected_count=2)
+ if items[0].msg['Subject'] == 'The results of your email commands':
+ results = items[0].msg
+ confirm_request = items[1].msg
+ else:
+ results = items[1].msg
+ confirm_request = items[0].msg
+
+ self.assertIn('Key succesfully set.', results.get_payload())
+ self.assertIn('Key fingerprint: {}'.format(self.bart_key.fingerprint),
+ results.get_payload())
+
+ confirm_wrapped = PGPWrapper(confirm_request)
+ self.assertTrue(confirm_wrapped.is_encrypted())
+
+ def test_set_no_token(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key set', '')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Missing token.', results_msg.get_payload())
+
+ def test_set_no_key(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key set token', '')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No keys attached? Send a key.',
+ results_msg.get_payload())
+
+ def test_set_multiple_keys(self):
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'Re: key set token')
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.anne_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('More than one key! Send only one key.',
+ results_msg.get_payload())
+
+ def test_set_no_email(self):
+ message = _create_mixed('', 'test@example.com', 'key set token')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_key(self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No email to subscribe with.', results_msg.get_payload())
+
+ def test_set_no_address(self):
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key set token')
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No adddress to subscribe with.',
+ results_msg.get_payload())
+
+ def test_set_no_pgp_address(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key set token')
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('A pgp enabled address not found.',
+ results_msg.get_payload())
+
+ def test_set_wrong_token(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ t.add(pgp_address)
+
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key set token')
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Wrong token.', results_msg.get_payload())
+
+ def test_confirm(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart, pubkey=self.bart_key.pubkey)
+
+ get_queue_messages('virgin')
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm {}'.format(token),
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ token))
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign(self.bart_key)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ pgp_address = PGPAddress.for_address(bart)
+ self.assertTrue(pgp_address.key_confirmed)
+ self.assertTrue(self.mlist.is_subscribed(bart))
+
+ def test_confirm_encrypted(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart, pubkey=self.bart_key.pubkey)
+
+ get_queue_messages('virgin')
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm {}'.format(token),
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ token))
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign_encrypt(self.bart_key,
+ self.pgp_list.pubkey,
+ self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+
+ make_testable_runner(CommandRunner, 'command').run()
+
+ pgp_address = PGPAddress.for_address(bart)
+ self.assertTrue(pgp_address.key_confirmed)
+ self.assertTrue(self.mlist.is_subscribed(bart))
+
+ def test_confirm_no_token(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key confirm', '')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Missing token.', results_msg.get_payload())
+
+ def test_confirm_no_email(self):
+ message = _create_plain('', 'test@example.com',
+ 'key confirm token', '')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No email to subscribe with.', results_msg.get_payload())
+
+ def test_confirm_no_pgp_address(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key confirm token', '')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('A pgp enabled address not found.',
+ results_msg.get_payload())
+
+ def test_confirm_no_key(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ t.add(pgp_address)
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm token',
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ 'token'))
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign(self.bart_key)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No key set.', results_msg.get_payload())
+
+ def test_confirm_not_signed(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart, pubkey=self.bart_key.pubkey)
+
+ get_queue_messages('virgin')
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm {}'.format(token),
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ token))
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Message not signed, ignoring.',
+ results_msg.get_payload())
+
+ def test_confirm_invalid_sig(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart, pubkey=self.bart_key.pubkey)
+
+ get_queue_messages('virgin')
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm {}'.format(token),
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ token))
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign(self.bart_key)
+ message.get_payload(0).set_payload(
+ 'Something that was definitely not signed.')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Message failed to verify.',
+ results_msg.get_payload())
+
+ def test_confirm_wrong_token(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ t.add(pgp_address)
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm token',
+ CONFIRM_REQUEST.format(
+ self.bart_key.fingerprint,
+ 'token'))
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign(self.bart_key)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Wrong token.', results_msg.get_payload())
+
+ def test_confirm_no_signed_statement(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart, pubkey=self.bart_key.pubkey)
+
+ get_queue_messages('virgin')
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'Re: key confirm {}'.format(token),
+ 'Some text, that definitely does not'
+ 'contain the required/expected statement.')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.sign(self.bart_key)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn("Message doesn't contain the expected statement.",
+ results_msg.get_payload())
+
+
+@public
+class TestAfterSubscription(unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ self.mlist = create_list('test@example.com', style_name='pgp-default')
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = load_key('ecc_p256.priv.asc')
+
+ self.bart_key = load_key('rsa_1024.priv.asc')
+ self.bart_new_key = load_key('ecc_p256.priv.asc')
+
+ def test_change(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key change')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_key(self.bart_new_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=2)
+ if items[0].msg['Subject'] == 'The results of your email commands':
+ results = items[0].msg
+ confirm_request = items[1].msg
+ else:
+ results = items[1].msg
+ confirm_request = items[0].msg
+
+ self.assertIn('Key change request received.', results.get_payload())
+
+ confirm_wrapped = PGPWrapper(confirm_request)
+ self.assertTrue(confirm_wrapped.is_encrypted())
+ decrypted = confirm_wrapped.decrypt(self.bart_new_key)
+ self.assertIn('key confirm', decrypted['subject'])
+
+ def test_change_confirm(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key change')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_key(self.bart_new_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=2)
+ if items[0].msg['Subject'] == 'The results of your email commands':
+ confirm_request = items[1].msg
+ else:
+ confirm_request = items[0].msg
+ request_wrapped = PGPWrapper(confirm_request)
+ decrypted = request_wrapped.decrypt(self.bart_new_key)
+
+ subj = decrypted['subject']
+ token = subj.split(' ')[-1]
+
+ confirm_message = _create_plain('bart@example.com', 'test@example.com',
+ decrypted['subject'],
+ CHANGE_CONFIRM_REQUEST.format(
+ self.bart_new_key.fingerprint,
+ token))
+ wrapped_confirm = MIMEWrapper(confirm_message)
+ confirm = wrapped_confirm.sign(self.bart_key)
+
+ mm_config.switchboards['command'].enqueue(confirm,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ pgp_address = PGPAddress.for_address(bart)
+ self.assertEqual(pgp_address.key_fingerprint,
+ self.bart_new_key.fingerprint)
+ self.assertTrue(pgp_address.key_confirmed)
+
+ def test_change_extra_arg(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key change extra arguments', '')
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('Extraneous argument/s: extra,arguments',
+ results_msg.get_payload())
+
+ def test_change_no_key(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key change', '')
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No keys attached? Send a key.',
+ results_msg.get_payload())
+
+ def test_change_multiple_keys(self):
+ set_message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key change')
+
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_key.pubkey)
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(self.bart_new_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('More than one key! Send only one key.',
+ results_msg.get_payload())
+
+ def test_change_no_email(self):
+ message = _create_mixed('', 'test@example.com', 'key change')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_key(self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('No email to change key of.', results_msg.get_payload())
+
+ def test_change_no_pgp_address(self):
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key change')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_key(self.bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+ items = get_queue_messages('virgin', expected_count=1)
+ results_msg = items[0].msg
+
+ self.assertIn('A pgp enabled address not found.',
+ results_msg.get_payload())
diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py
index 1d07486..25a87dc 100644
--- a/src/mailman_pgp/model/address.py
+++ b/src/mailman_pgp/model/address.py
@@ -22,7 +22,7 @@ from os.path import exists, isfile, join
from mailman.database.types import SAUnicode
from mailman.interfaces.usermanager import IUserManager
from pgpy import PGPKey
-from sqlalchemy import Column, Integer
+from sqlalchemy import Boolean, Column, Integer
from sqlalchemy.orm import reconstructor
from zope.component import getUtility
@@ -36,8 +36,9 @@ class PGPAddress(Base):
__tablename__ = 'pgp_addresses'
id = Column(Integer, primary_key=True)
- email = Column(SAUnicode, index=True)
+ email = Column(SAUnicode, index=True, unique=True)
key_fingerprint = Column(SAUnicode)
+ key_confirmed = Column(Boolean, default=False)
def __init__(self, address):
super().__init__()
@@ -118,4 +119,10 @@ class PGPAddress(Base):
"""
if address is None:
return None
- return PGPAddress.query().filter_by(email=address.email).first()
+ return PGPAddress.for_email(address.email)
+
+ @staticmethod
+ def for_email(email):
+ if email is None:
+ return None
+ return PGPAddress.query().filter_by(email=email).first()
diff --git a/src/mailman_pgp/pgp/inline.py b/src/mailman_pgp/pgp/inline.py
index 9c19030..458cc26 100644
--- a/src/mailman_pgp/pgp/inline.py
+++ b/src/mailman_pgp/pgp/inline.py
@@ -201,7 +201,10 @@ class InlineWrapper:
def _decrypt(self, part, key):
message = PGPMessage.from_blob(part.get_payload())
decrypted = key.decrypt(message)
- part.set_payload(decrypted.message)
+ if decrypted.is_signed:
+ part.set_payload(str(decrypted))
+ else:
+ part.set_payload(decrypted.message)
def decrypt(self, key):
"""
diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py
index afb1c35..a7e31b8 100644
--- a/src/mailman_pgp/pgp/mime.py
+++ b/src/mailman_pgp/pgp/mime.py
@@ -76,7 +76,8 @@ class MIMEWrapper:
if not self._is_mime():
return False
second_type = self.msg.get_payload(1).get_content_type()
- protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol'))
+ protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol',
+ ''))
content_subtype = self.msg.get_content_subtype()
return (second_type == MIMEWrapper._signed_type and
@@ -87,7 +88,7 @@ class MIMEWrapper:
return self.is_signed()
def get_signed(self):
- yield self.msg.get_payload(0).as_string(0)
+ yield self.msg.get_payload(0).as_string()
def is_encrypted(self):
"""
@@ -102,7 +103,8 @@ class MIMEWrapper:
first_type = self.msg.get_payload(0).get_content_type()
second_type = self.msg.get_payload(1).get_content_type()
content_subtype = self.msg.get_content_subtype()
- protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol'))
+ protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol',
+ ''))
return ('Version: 1' in first_part and
first_type == MIMEWrapper._encrypted_type and
@@ -145,6 +147,26 @@ class MIMEWrapper:
key, _ = PGPKey.from_blob(part.get_payload())
yield key
+ def attach_key(self, key):
+ """
+
+ :param key:
+ :type key: pgpy.PGPKey
+ :return:
+ """
+ filename = '0x' + key.fingerprint.keyid + '.asc'
+ key_part = MIMEApplication(_data=str(key),
+ _subtype=MIMEWrapper._keys_subtype,
+ _encoder=encode_7or8bit,
+ name=filename)
+ key_part.add_header('Content-Description',
+ 'OpenPGP key')
+ key_part.add_header('Content-Disposition', 'attachment',
+ filename=filename)
+ out = copy.deepcopy(self.msg)
+ out.attach(key_part)
+ return out
+
def verify(self, key):
"""
Verify the signature of this message with key.
@@ -181,9 +203,9 @@ class MIMEWrapper:
_subtype=MIMEWrapper._signature_subtype,
_encoder=encode_7or8bit,
name='signature.asc')
- second_part.add_header('content-description',
+ second_part.add_header('Content-Description',
'OpenPGP digital signature')
- second_part.add_header('content-disposition', 'attachment',
+ second_part.add_header('Content-Disposition', 'attachment',
filename='signature.asc')
out.attach(copy.deepcopy(msg))
@@ -249,16 +271,16 @@ class MIMEWrapper:
first_part = MIMEApplication(_data='Version: 1',
_subtype=MIMEWrapper._encryption_subtype,
_encoder=encode_7or8bit)
- first_part.add_header('content-description',
+ first_part.add_header('Content-Description',
'PGP/MIME version identification')
second_part = MIMEApplication(_data=str(payload),
_subtype='octet-stream',
_encoder=encode_7or8bit,
name='encrypted.asc')
- second_part.add_header('content-description',
+ second_part.add_header('Content-Description',
'OpenPGP encrypted message')
- second_part.add_header('content-disposition', 'inline',
+ second_part.add_header('Content-Disposition', 'inline',
filename='encrypted.asc')
out.attach(first_part)
out.attach(second_part)
diff --git a/src/mailman_pgp/pgp/tests/base.py b/src/mailman_pgp/pgp/tests/base.py
index d561fda..52e5ec4 100644
--- a/src/mailman_pgp/pgp/tests/base.py
+++ b/src/mailman_pgp/pgp/tests/base.py
@@ -42,63 +42,66 @@ def load_key(path):
class WrapperTestCase(TestCase):
wrapper = None
+ def wrap(self, message):
+ return self.wrapper(message)
+
def is_signed(self, message, signed):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
self.assertEqual(wrapped.is_signed(), signed)
def sign(self, message, key):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
signed = wrapped.sign(key)
- signed_wrapped = self.wrapper(signed)
+ signed_wrapped = self.wrap(signed)
self.assertTrue(signed_wrapped.is_signed())
def sign_verify(self, message, priv, pub):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
signed = wrapped.sign(priv)
- signed_wrapped = self.wrapper(signed)
+ signed_wrapped = self.wrap(signed)
for signature in signed_wrapped.verify(pub):
self.assertTrue(bool(signature))
def verify(self, message, key, valid):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
for signature in wrapped.verify(key):
self.assertEqual(bool(signature), valid)
def is_encrypted(self, message, encrypted):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
self.assertEqual(wrapped.is_encrypted(), encrypted)
def encrypt(self, message, *keys, **kwargs):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
encrypted = wrapped.encrypt(*keys, **kwargs)
- encrypted_wrapped = self.wrapper(encrypted)
+ encrypted_wrapped = self.wrap(encrypted)
self.assertTrue(encrypted_wrapped.is_encrypted())
def encrypt_decrypt(self, message, pub, priv):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
encrypted = wrapped.encrypt(pub)
- encrypted_wrapped = self.wrapper(encrypted)
+ encrypted_wrapped = self.wrap(encrypted)
decrypted = encrypted_wrapped.decrypt(priv)
- decrypted_wrapped = self.wrapper(decrypted)
+ decrypted_wrapped = self.wrap(decrypted)
self.assertFalse(decrypted_wrapped.is_encrypted())
self.assertEqual(decrypted.get_payload(), message.get_payload())
def decrypt(self, message, key, clear):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
decrypted = wrapped.decrypt(key)
- decrypted_wrapped = self.wrapper(decrypted)
+ decrypted_wrapped = self.wrap(decrypted)
self.assertFalse(decrypted_wrapped.is_encrypted())
self.assertEqual(decrypted.get_payload(), clear)
def has_keys(self, message, has_keys):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
self.assertEqual(wrapped.has_keys(), has_keys)
def keys(self, message, keys):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
loaded = list(wrapped.keys())
self.assertEqual(len(loaded), len(keys))
@@ -107,13 +110,30 @@ class WrapperTestCase(TestCase):
self.assertListEqual(loaded_fingerprints, fingerprints)
def sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key):
- wrapped = self.wrapper(message)
+ wrapped = self.wrap(message)
encrypted = wrapped.sign_encrypt(sign_key, encrypt_key.pubkey)
- encrypted_wrapped = self.wrapper(encrypted)
+ encrypted_wrapped = self.wrap(encrypted)
+ self.assertTrue(encrypted_wrapped.is_encrypted())
+
+ decrypted = encrypted_wrapped.decrypt(encrypt_key)
+ decrypted_wrapped = self.wrap(decrypted)
+ self.assertTrue(decrypted_wrapped.is_signed())
+ self.assertFalse(decrypted_wrapped.is_encrypted())
+
+ verification = decrypted_wrapped.verify(sign_key.pubkey)
+ for sig in verification:
+ self.assertTrue(bool(sig))
+ self.assertListEqual(list(decrypted_wrapped.get_signed()),
+ list(wrapped.get_payload()))
+
+ def sign_then_encrypt_decrypt_verify(self, message, sign_key, encrypt_key):
+ wrapped = self.wrap(message)
+ encrypted = wrapped.sign_then_encrypt(sign_key, encrypt_key.pubkey)
+ encrypted_wrapped = self.wrap(encrypted)
self.assertTrue(encrypted_wrapped.is_encrypted())
decrypted = encrypted_wrapped.decrypt(encrypt_key)
- decrypted_wrapped = self.wrapper(decrypted)
+ decrypted_wrapped = self.wrap(decrypted)
self.assertTrue(decrypted_wrapped.is_signed())
self.assertFalse(decrypted_wrapped.is_encrypted())
diff --git a/src/mailman_pgp/pgp/tests/test_inline.py b/src/mailman_pgp/pgp/tests/test_inline.py
index cd18209..a89963a 100644
--- a/src/mailman_pgp/pgp/tests/test_inline.py
+++ b/src/mailman_pgp/pgp/tests/test_inline.py
@@ -18,6 +18,7 @@
"""Tests for the inline wrapper."""
from parameterized import parameterized
+from public import public
from mailman_pgp.pgp.inline import InlineWrapper
from mailman_pgp.pgp.tests.base import load_key, load_message, WrapperTestCase
@@ -27,6 +28,7 @@ class InlineWrapperTestCase(WrapperTestCase):
wrapper = InlineWrapper
+@public
class TestSigning(InlineWrapperTestCase):
@parameterized.expand([
(load_message('inline_cleartext_signed.eml'),
@@ -43,7 +45,7 @@ class TestSigning(InlineWrapperTestCase):
False)
])
def test_is_signed(self, message, signed):
- super().is_signed(message, signed)
+ self.is_signed(message, signed)
@parameterized.expand([
(load_message('clear.eml'),
@@ -52,7 +54,7 @@ class TestSigning(InlineWrapperTestCase):
load_key('ecc_p256.priv.asc'))
])
def test_sign(self, message, key):
- super().sign(message, key)
+ self.sign(message, key)
@parameterized.expand([
(load_message('clear.eml'),
@@ -63,7 +65,7 @@ class TestSigning(InlineWrapperTestCase):
load_key('ecc_p256.pub.asc'))
])
def test_sign_verify(self, message, priv, pub):
- super().sign_verify(message, priv, pub)
+ self.sign_verify(message, priv, pub)
@parameterized.expand([
(load_message('inline_cleartext_signed.eml'),
@@ -80,9 +82,10 @@ class TestSigning(InlineWrapperTestCase):
False),
])
def test_verify(self, message, key, valid):
- super().verify(message, key, valid)
+ self.verify(message, key, valid)
+@public
class TestEncryption(InlineWrapperTestCase):
@parameterized.expand([
(load_message('inline_encrypted.eml'),
@@ -99,7 +102,7 @@ class TestEncryption(InlineWrapperTestCase):
False)
])
def test_is_encrypted(self, message, encrypted):
- super().is_encrypted(message, encrypted)
+ self.is_encrypted(message, encrypted)
@parameterized.expand([
(load_message('clear.eml'),
@@ -110,9 +113,9 @@ class TestEncryption(InlineWrapperTestCase):
])
def test_encrypt(self, message, keys, **kwargs):
if isinstance(keys, tuple):
- super().encrypt(message, *keys, **kwargs)
+ self.encrypt(message, *keys, **kwargs)
else:
- super().encrypt(message, keys, **kwargs)
+ self.encrypt(message, keys, **kwargs)
@parameterized.expand([
(load_message('clear.eml'),
@@ -123,7 +126,7 @@ class TestEncryption(InlineWrapperTestCase):
load_key('ecc_p256.priv.asc'))
])
def test_encrypt_decrypt(self, message, pub, priv):
- super().encrypt_decrypt(message, pub, priv)
+ self.encrypt_decrypt(message, pub, priv)
@parameterized.expand([
(load_message('inline_encrypted.eml'),
@@ -131,9 +134,10 @@ class TestEncryption(InlineWrapperTestCase):
'Some encrypted text.\n\n')
])
def test_decrypt(self, message, key, clear):
- super().decrypt(message, key, clear)
+ self.decrypt(message, key, clear)
+@public
class TestKeys(InlineWrapperTestCase):
@parameterized.expand([
(load_message('inline_privkey.eml'),
@@ -146,7 +150,7 @@ class TestKeys(InlineWrapperTestCase):
False)
])
def test_has_keys(self, message, has_keys):
- super().has_keys(message, has_keys)
+ self.has_keys(message, has_keys)
@parameterized.expand([
(load_message('inline_privkey.eml'),
@@ -155,4 +159,24 @@ class TestKeys(InlineWrapperTestCase):
[load_key('rsa_1024.pub.asc')])
])
def test_keys(self, message, keys):
- super().keys(message, keys)
+ self.keys(message, keys)
+
+
+@public
+class TestCombined(InlineWrapperTestCase):
+ @parameterized.expand([
+ (load_message('clear.eml'),
+ load_key('rsa_1024.priv.asc'),
+ load_key('ecc_p256.priv.asc'))
+ ])
+ def test_sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key):
+ self.sign_encrypt_decrypt_verify(message, sign_key, encrypt_key)
+
+ @parameterized.expand([
+ (load_message('clear.eml'),
+ load_key('rsa_1024.priv.asc'),
+ load_key('ecc_p256.priv.asc'))
+ ])
+ def test_sign_then_encrypt_decrypt_verify(self, message, sign_key,
+ encrypt_key):
+ self.sign_then_encrypt_decrypt_verify(message, sign_key, encrypt_key)
diff --git a/src/mailman_pgp/pgp/tests/test_keygen.py b/src/mailman_pgp/pgp/tests/test_keygen.py
index dab6801..5c59614 100644
--- a/src/mailman_pgp/pgp/tests/test_keygen.py
+++ b/src/mailman_pgp/pgp/tests/test_keygen.py
@@ -21,10 +21,12 @@ from unittest import TestCase
from pgpy import PGPKey
from pgpy.constants import PubKeyAlgorithm
+from public import public
from mailman_pgp.pgp.keygen import ListKeyGenerator
+@public
class TesKeygen(TestCase):
def setUp(self):
self.keypair_config = {
diff --git a/src/mailman_pgp/pgp/tests/test_mime.py b/src/mailman_pgp/pgp/tests/test_mime.py
index fab50bb..f56c781 100644
--- a/src/mailman_pgp/pgp/tests/test_mime.py
+++ b/src/mailman_pgp/pgp/tests/test_mime.py
@@ -18,6 +18,7 @@
"""Tests for the MIME wrapper."""
from parameterized import parameterized
+from public import public
from mailman_pgp.pgp.mime import MIMEWrapper
from mailman_pgp.pgp.tests.base import load_key, load_message, WrapperTestCase
@@ -27,6 +28,7 @@ class MIMEWrapperTestCase(WrapperTestCase):
wrapper = MIMEWrapper
+@public
class TestSigning(MIMEWrapperTestCase):
@parameterized.expand([
(load_message('mime_signed.eml'),
@@ -37,7 +39,7 @@ class TestSigning(MIMEWrapperTestCase):
False)
])
def test_is_signed(self, message, signed):
- super().is_signed(message, signed)
+ self.is_signed(message, signed)
@parameterized.expand([
(load_message('clear.eml'),
@@ -46,7 +48,7 @@ class TestSigning(MIMEWrapperTestCase):
load_key('ecc_p256.priv.asc'))
])
def test_sign(self, message, key):
- super().sign(message, key)
+ self.sign(message, key)
@parameterized.expand([
(load_message('clear.eml'),
@@ -57,7 +59,7 @@ class TestSigning(MIMEWrapperTestCase):
load_key('ecc_p256.pub.asc'))
])
def test_sign_verify(self, message, priv, pub):
- super().sign_verify(message, priv, pub)
+ self.sign_verify(message, priv, pub)
@parameterized.expand([
(load_message('mime_signed.eml'),
@@ -68,9 +70,10 @@ class TestSigning(MIMEWrapperTestCase):
False)
])
def test_verify(self, message, key, valid):
- super().verify(message, key, valid)
+ self.verify(message, key, valid)
+@public
class TestEncryption(MIMEWrapperTestCase):
@parameterized.expand([
(load_message('mime_encrypted.eml'),
@@ -79,7 +82,7 @@ class TestEncryption(MIMEWrapperTestCase):
True)
])
def test_is_encrypted(self, message, encrypted):
- super().is_encrypted(message, encrypted)
+ self.is_encrypted(message, encrypted)
@parameterized.expand([
(load_message('clear.eml'),
@@ -90,9 +93,9 @@ class TestEncryption(MIMEWrapperTestCase):
])
def test_encrypt(self, message, keys, **kwargs):
if isinstance(keys, tuple):
- super().encrypt(message, *keys, **kwargs)
+ self.encrypt(message, *keys, **kwargs)
else:
- super().encrypt(message, keys, **kwargs)
+ self.encrypt(message, keys, **kwargs)
@parameterized.expand([
(load_message('clear.eml'),
@@ -103,7 +106,7 @@ class TestEncryption(MIMEWrapperTestCase):
load_key('ecc_p256.priv.asc'))
])
def test_encrypt_decrypt(self, message, pub, priv):
- super().encrypt_decrypt(message, pub, priv)
+ self.encrypt_decrypt(message, pub, priv)
@parameterized.expand([
(load_message('mime_encrypted.eml'),
@@ -111,9 +114,10 @@ class TestEncryption(MIMEWrapperTestCase):
'Some encrypted text.\n')
])
def test_decrypt(self, message, key, clear):
- super().decrypt(message, key, clear)
+ self.decrypt(message, key, clear)
+@public
class TestKeys(MIMEWrapperTestCase):
@parameterized.expand([
(load_message('mime_privkey.eml'),
@@ -126,7 +130,7 @@ class TestKeys(MIMEWrapperTestCase):
False)
])
def test_has_keys(self, message, has_keys):
- super().has_keys(message, has_keys)
+ self.has_keys(message, has_keys)
@parameterized.expand([
(load_message('mime_privkey.eml'),
@@ -135,9 +139,10 @@ class TestKeys(MIMEWrapperTestCase):
[load_key('rsa_1024.pub.asc')])
])
def test_keys(self, message, keys):
- super().keys(message, keys)
+ self.keys(message, keys)
+@public
class TestCombined(MIMEWrapperTestCase):
@parameterized.expand([
(load_message('clear.eml'),
@@ -145,4 +150,13 @@ class TestCombined(MIMEWrapperTestCase):
load_key('ecc_p256.priv.asc'))
])
def test_sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key):
- super().sign_encrypt_decrypt_verify(message, sign_key, encrypt_key)
+ self.sign_encrypt_decrypt_verify(message, sign_key, encrypt_key)
+
+ @parameterized.expand([
+ (load_message('clear.eml'),
+ load_key('rsa_1024.priv.asc'),
+ load_key('ecc_p256.priv.asc'))
+ ])
+ def test_sign_then_encrypt_decrypt_verify(self, message, sign_key,
+ encrypt_key):
+ self.sign_then_encrypt_decrypt_verify(message, sign_key, encrypt_key)
diff --git a/src/mailman_pgp/pgp/tests/test_wrapper.py b/src/mailman_pgp/pgp/tests/test_wrapper.py
index fb7f0bb..14f40fc 100644
--- a/src/mailman_pgp/pgp/tests/test_wrapper.py
+++ b/src/mailman_pgp/pgp/tests/test_wrapper.py
@@ -17,6 +17,7 @@
"""Tests for the combined wrapper."""
from parameterized import parameterized
+from public import public
from mailman_pgp.pgp.tests.base import load_key, load_message, WrapperTestCase
from mailman_pgp.pgp.wrapper import PGPWrapper
@@ -26,6 +27,7 @@ class PGPWrapperTestCase(WrapperTestCase):
wrapper = PGPWrapper
+@public
class TestSigning(PGPWrapperTestCase):
@parameterized.expand([
(load_message('inline_cleartext_signed.eml'),
@@ -46,7 +48,7 @@ class TestSigning(PGPWrapperTestCase):
False)
])
def test_is_signed(self, message, signed):
- super().is_signed(message, signed)
+ self.is_signed(message, signed)
@parameterized.expand([
(load_message('clear.eml'),
@@ -55,7 +57,7 @@ class TestSigning(PGPWrapperTestCase):
load_key('ecc_p256.priv.asc'))
])
def test_sign(self, message, key):
- super().sign(message, key)
+ self.sign(message, key)
@parameterized.expand([
(load_message('inline_cleartext_signed.eml'),
@@ -72,9 +74,10 @@ class TestSigning(PGPWrapperTestCase):
False)
])
def test_verify(self, message, key, valid):
- super().verify(message, key, valid)
+ self.verify(message, key, valid)
+@public
class TestEncryption(PGPWrapperTestCase):
@parameterized.expand([
(load_message('inline_encrypted.eml'),
@@ -93,7 +96,7 @@ class TestEncryption(PGPWrapperTestCase):
False)
])
def test_is_encrypted(self, message, encrypted):
- super().is_encrypted(message, encrypted)
+ self.is_encrypted(message, encrypted)
@parameterized.expand([
(load_message('clear.eml'),
@@ -104,9 +107,9 @@ class TestEncryption(PGPWrapperTestCase):
])
def test_encrypt(self, message, keys, **kwargs):
if isinstance(keys, tuple):
- super().encrypt(message, *keys, **kwargs)
+ self.encrypt(message, *keys, **kwargs)
else:
- super().encrypt(message, keys, **kwargs)
+ self.encrypt(message, keys, **kwargs)
@parameterized.expand([
(load_message('clear.eml'),
@@ -117,7 +120,7 @@ class TestEncryption(PGPWrapperTestCase):
load_key('ecc_p256.priv.asc'))
])
def test_encrypt_decrypt(self, message, pub, priv):
- super().encrypt_decrypt(message, pub, priv)
+ self.encrypt_decrypt(message, pub, priv)
@parameterized.expand([
(load_message('inline_encrypted.eml'),
@@ -125,9 +128,10 @@ class TestEncryption(PGPWrapperTestCase):
'Some encrypted text.\n\n')
])
def test_decrypt(self, message, key, clear):
- super().decrypt(message, key, clear)
+ self.decrypt(message, key, clear)
+@public
class TestKeys(PGPWrapperTestCase):
@parameterized.expand([
(load_message('inline_privkey.eml'),
@@ -146,7 +150,7 @@ class TestKeys(PGPWrapperTestCase):
False)
])
def test_has_keys(self, message, has_keys):
- super().has_keys(message, has_keys)
+ self.has_keys(message, has_keys)
@parameterized.expand([
(load_message('inline_privkey.eml'),
@@ -159,4 +163,24 @@ class TestKeys(PGPWrapperTestCase):
[load_key('rsa_1024.pub.asc')])
])
def test_keys(self, message, keys):
- super().keys(message, keys)
+ self.keys(message, keys)
+
+
+@public
+class TestCombined(PGPWrapperTestCase):
+ @parameterized.expand([
+ (load_message('clear.eml'),
+ load_key('rsa_1024.priv.asc'),
+ load_key('ecc_p256.priv.asc'))
+ ])
+ def test_sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key):
+ self.sign_encrypt_decrypt_verify(message, sign_key, encrypt_key)
+
+ @parameterized.expand([
+ (load_message('clear.eml'),
+ load_key('rsa_1024.priv.asc'),
+ load_key('ecc_p256.priv.asc'))
+ ])
+ def test_sign_then_encrypt_decrypt_verify(self, message, sign_key,
+ encrypt_key):
+ self.sign_then_encrypt_decrypt_verify(message, sign_key, encrypt_key)
diff --git a/src/mailman_pgp/pgp/utils.py b/src/mailman_pgp/pgp/utils.py
index 0946e3f..a824138 100644
--- a/src/mailman_pgp/pgp/utils.py
+++ b/src/mailman_pgp/pgp/utils.py
@@ -18,7 +18,7 @@
"""Various pgp and email utilities."""
-def copy_headers(from_msg, to_msg):
+def copy_headers(from_msg, to_msg, overwrite=False):
"""
Copy the headers and unixfrom from a message to another one.
@@ -28,6 +28,8 @@ def copy_headers(from_msg, to_msg):
:type to_msg: email.message.Message
"""
for key, value in from_msg.items():
+ if overwrite:
+ del to_msg[key]
if key not in to_msg:
to_msg[key] = value
if to_msg.get_unixfrom() is None:
diff --git a/src/mailman_pgp/pgp/wrapper.py b/src/mailman_pgp/pgp/wrapper.py
index fcbec0e..6e8a8f9 100644
--- a/src/mailman_pgp/pgp/wrapper.py
+++ b/src/mailman_pgp/pgp/wrapper.py
@@ -80,7 +80,10 @@ class PGPWrapper():
yield from self.inline.verify(key)
def verifies(self, key):
- return all(bool(verification) for verification in self.verify(key))
+ return all(bool(verification) and
+ all(not sigsubj.signature.is_expired
+ for sigsubj in verification.good_signatures) for
+ verification in self.verify(key))
def is_encrypted(self):
return self.mime.is_encrypted() or self.inline.is_encrypted()
diff --git a/src/mailman_pgp/rest/addresses.py b/src/mailman_pgp/rest/addresses.py
index 4f7a04d..6d99a04 100644
--- a/src/mailman_pgp/rest/addresses.py
+++ b/src/mailman_pgp/rest/addresses.py
@@ -17,19 +17,44 @@
""""""
-from mailman.rest.helpers import CollectionMixin
+from mailman.rest.helpers import CollectionMixin, etag, not_found, okay
from public.public import public
+from mailman_pgp.config import config
+from mailman_pgp.model.address import PGPAddress
+
class _EncryptedBase(CollectionMixin):
- pass
+ def _resource_as_dict(self, address):
+ """See `CollectionMixin`."""
+ return dict(email=address.email,
+ key_fingerprint=address.key_fingerprint,
+ key_confirmed=address.key_confirmed,
+ self_link=self.api.path_to(
+ '/plugins/{}/addresses/{}'.format(config.name,
+ address.email)
+ ))
+
+ def _get_collection(self, request):
+ """See `CollectionMixin`."""
+ return PGPAddress.query().all()
@public
-class AllAddresses:
- pass
+class AllAddresses(_EncryptedBase):
+ def on_get(self, request, response):
+ """/addresses"""
+ resource = self._make_collection(request)
+ return okay(response, etag(resource))
@public
-class AnAddress:
- pass
+class AnAddress(_EncryptedBase):
+ def __init__(self, email):
+ self._address = PGPAddress.for_email(email)
+
+ def on_get(self, request, response):
+ if self._address is None:
+ return not_found(response)
+ else:
+ okay(response, self._resource_as_json(self._address))
diff --git a/src/mailman_pgp/rest/lists.py b/src/mailman_pgp/rest/lists.py
index 9141bce..37b8d28 100644
--- a/src/mailman_pgp/rest/lists.py
+++ b/src/mailman_pgp/rest/lists.py
@@ -73,7 +73,7 @@ class AnEncryptedList(_EncryptedBase):
okay(response, self._resource_as_json(self._mlist))
@child()
- def key(self, context, segments):
+ def pubkey(self, context, segments):
return AListPubkey(self._mlist), []
diff --git a/src/mailman_pgp/rest/tests/test_addresses.py b/src/mailman_pgp/rest/tests/test_addresses.py
new file mode 100644
index 0000000..1730618
--- /dev/null
+++ b/src/mailman_pgp/rest/tests/test_addresses.py
@@ -0,0 +1,60 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+import unittest
+from urllib.error import HTTPError
+
+from mailman.interfaces.usermanager import IUserManager
+from mailman.testing.helpers import call_api
+from zope.component import getUtility
+
+from mailman_pgp.database import transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.testing.layers import PGPRESTLayer
+
+
+class TestAddresses(unittest.TestCase):
+ layer = PGPRESTLayer
+
+ def setUp(self):
+ self.mm_address = getUtility(IUserManager).create_address(
+ 'anne@example.com')
+ with transaction() as t:
+ self.address = PGPAddress(self.mm_address)
+ t.add(self.address)
+
+ def test_missing_address(self):
+ with self.assertRaises(HTTPError) as cm:
+ call_api('http://localhost:9001/3.1/plugins/pgp/addresses/'
+ 'bart@example.com')
+ self.assertEqual(cm.exception.code, 404)
+
+ def test_all_addresses(self):
+ json, response = call_api(
+ 'http://localhost:9001/3.1/plugins/pgp/addresses/')
+ self.assertEqual(json['total_size'], 1)
+ self.assertEqual(len(json['entries']), 1)
+ addresses = json['entries']
+ address = addresses[0]
+ self.assertEqual(address['email'], self.address.email)
+
+ def test_get_address(self):
+ json, response = call_api(
+ 'http://localhost:9001/3.1/plugins/pgp/addresses/'
+ 'anne@example.com')
+ self.assertEqual(json['email'], self.address.email)
diff --git a/src/mailman_pgp/rest/tests/test_lists.py b/src/mailman_pgp/rest/tests/test_lists.py
index a674cc9..c5c9854 100644
--- a/src/mailman_pgp/rest/tests/test_lists.py
+++ b/src/mailman_pgp/rest/tests/test_lists.py
@@ -68,7 +68,7 @@ class TestLists(TestCase):
json, response = call_api(
'http://localhost:9001/3.1/plugins/pgp/lists/'
- 'another.example.com/key')
+ 'another.example.com/pubkey')
json.pop('http_etag')
self.assertEqual(len(json.keys()), 2)
diff --git a/src/mailman_pgp/styles/base.py b/src/mailman_pgp/styles/base.py
index 6b5271b..a7c3366 100644
--- a/src/mailman_pgp/styles/base.py
+++ b/src/mailman_pgp/styles/base.py
@@ -19,7 +19,7 @@
from lazr.config import as_boolean
from public import public
-from mailman_pgp.config import config
+from mailman_pgp.config import config, mm_config
from mailman_pgp.database import transaction
from mailman_pgp.model.list import PGPMailingList
@@ -32,6 +32,11 @@ class PGPStyle:
"""
mailing_list.posting_chain = 'pgp-posting-chain'
+ old_policy = mailing_list.subscription_policy.name
+ new_policy_name = 'pgp-' + old_policy[4:]
+ if new_policy_name in mm_config.workflows:
+ mailing_list.subscription_policy = new_policy_name
+
pgp_list = PGPMailingList.for_list(mailing_list)
if pgp_list:
return
diff --git a/src/mailman_pgp/styles/tests/test_base.py b/src/mailman_pgp/styles/tests/test_base.py
index 159a242..d343fe4 100644
--- a/src/mailman_pgp/styles/tests/test_base.py
+++ b/src/mailman_pgp/styles/tests/test_base.py
@@ -33,8 +33,7 @@ class TestBaseStyle(TestCase):
base_style = PGPStyle()
base_style.apply(mlist)
- pgp_list = PGPMailingList.query().filter_by(
- list_id=mlist.list_id).first()
+ pgp_list = PGPMailingList.for_list(mlist)
# Test that we have our PGPMailingList
self.assertIsNotNone(pgp_list)
diff --git a/src/mailman_pgp/testing/layers.py b/src/mailman_pgp/testing/layers.py
index 5e0d8b3..fb8a3ec 100644
--- a/src/mailman_pgp/testing/layers.py
+++ b/src/mailman_pgp/testing/layers.py
@@ -14,6 +14,7 @@
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
+import contextlib
import os
from os.path import isfile
@@ -24,17 +25,36 @@ from mailman_pgp.database import transaction
from mailman_pgp.model.base import Base
-def reset_pgp_world():
+def reset_rollback():
+ config.db.session.rollback()
+
+
+def reset_pgp_dirs():
for keydir in (config.pgp.keydir_config.values()):
for path in os.listdir(keydir):
full_path = os.path.join(keydir, path)
if isfile(full_path):
os.remove(full_path)
+
+
+def reset_pgp_hard():
+ reset_rollback()
+ reset_pgp_dirs()
with transaction():
Base.metadata.drop_all(config.db.engine)
Base.metadata.create_all(config.db.engine)
+def reset_pgp_soft():
+ reset_rollback()
+ reset_pgp_dirs()
+ with contextlib.closing(config.db.engine.connect()) as con:
+ trans = con.begin()
+ for table in reversed(Base.metadata.sorted_tables):
+ con.execute(table.delete())
+ trans.commit()
+
+
# It's weird that ws have to do this, but for some reason nose2 test layers
# don't work when ws create a mixin class with the two classmethods
# and subclass both it and the respective Mailman Core test layer.
@@ -45,28 +65,38 @@ class PGPConfigLayer(ConfigLayer):
@classmethod
def tearDown(cls):
- reset_pgp_world()
+ reset_pgp_soft()
+
+ @classmethod
+ def testTearDown(cls):
+ reset_pgp_soft()
+
+
+class PGPMigrationLayer(ConfigLayer):
+ @classmethod
+ def tearDown(cls):
+ reset_pgp_hard()
@classmethod
def testTearDown(cls):
- reset_pgp_world()
+ reset_pgp_hard()
class PGPSMTPLayer(SMTPLayer):
@classmethod
def tearDown(cls):
- reset_pgp_world()
+ reset_pgp_soft()
@classmethod
def testTearDown(cls):
- reset_pgp_world()
+ reset_pgp_soft()
class PGPRESTLayer(RESTLayer):
@classmethod
def tearDown(cls):
- reset_pgp_world()
+ reset_pgp_soft()
@classmethod
def testTearDown(cls):
- reset_pgp_world()
+ reset_pgp_soft()
diff --git a/src/mailman_pgp/testing/start.py b/src/mailman_pgp/testing/start.py
index 37f3de4..128855e 100644
--- a/src/mailman_pgp/testing/start.py
+++ b/src/mailman_pgp/testing/start.py
@@ -25,3 +25,5 @@ from public import public
def start_run(plugin):
warnings.filterwarnings(action='ignore', category=UserWarning,
module='.*mailman_pgp.*')
+ warnings.filterwarnings(action='ignore', category=UserWarning,
+ module='.*PGPy.*')
diff --git a/src/mailman_pgp/workflows/__init__.py b/src/mailman_pgp/workflows/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/mailman_pgp/workflows/__init__.py
diff --git a/src/mailman_pgp/workflows/base.py b/src/mailman_pgp/workflows/base.py
new file mode 100644
index 0000000..d35e58c
--- /dev/null
+++ b/src/mailman_pgp/workflows/base.py
@@ -0,0 +1,145 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+from mailman.email.message import UserNotification
+from mailman.interfaces.subscriptions import TokenOwner
+from pgpy import PGPKey
+
+from mailman_pgp.database import transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.utils import copy_headers
+from mailman_pgp.pgp.wrapper import PGPWrapper
+
+KEY_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+We need your pubkey.
+Reply to this message with it as a PGP/MIME(preferred) or inline.
+----------"""
+
+CONFIRM_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+Reply to this message with this whole text
+signed with your supplied key, either inline or PGP/MIME.
+
+Fingerprint: {}
+Token: {}
+----------
+"""
+
+
+class PGPMixin:
+ def _step_pgp_prepare(self):
+ pgp_address = PGPAddress.for_address(self.address)
+ if pgp_address is None:
+ with transaction() as t:
+ pgp_address = PGPAddress(self.address)
+ t.add(pgp_address)
+
+
+class SetPubkeyMixin:
+ def __init__(self, pubkey=None):
+ self.pubkey = pubkey
+
+ @property
+ def pubkey_key(self):
+ if self.pubkey is None:
+ return None
+ return str(self.pubkey)
+
+ @pubkey_key.setter
+ def pubkey_key(self, value):
+ if value is not None:
+ self.pubkey, _ = PGPKey.from_blob(value)
+ else:
+ self.pubkey = None
+
+ def _step_pubkey_checks(self):
+ pgp_address = PGPAddress.for_address(self.address)
+ assert pgp_address is not None
+
+ if self.pubkey is None:
+ if pgp_address.key is None:
+ self.push('send_key_request')
+ else:
+ with transaction():
+ pgp_address.key = self.pubkey
+
+ def _step_send_key_request(self):
+ self._set_token(TokenOwner.subscriber)
+ self.push('receive_key')
+ self.save()
+ request_address = self.mlist.request_address
+ email_address = self.address.email
+ msg = UserNotification(email_address, request_address,
+ 'key set {}'.format(self.token),
+ KEY_REQUEST)
+ msg.send(self.mlist, add_precedence=False)
+ # Now we wait for the confirmation.
+ raise StopIteration
+
+ def _step_receive_key(self):
+ self._restore_subscriber()
+ self._set_token(TokenOwner.no_one)
+
+
+class ConfirmPubkeyMixin:
+ def __init__(self, pre_confirmed=False):
+ self.pubkey_confirmed = pre_confirmed
+
+ def _step_pubkey_confirmation(self):
+ pgp_address = PGPAddress.for_address(self.address)
+ assert pgp_address is not None
+
+ if self.pubkey_confirmed:
+ with transaction():
+ pgp_address.key_confirmed = True
+ else:
+ if not pgp_address.key_confirmed:
+ self.push('send_key_confirm_request')
+
+ def _step_send_key_confirm_request(self):
+ self._set_token(TokenOwner.subscriber)
+ self.push('receive_key_confirmation')
+ self.save()
+
+ pgp_address = PGPAddress.for_address(self.address)
+ request_address = self.mlist.request_address
+ email_address = self.address.email
+ msg = UserNotification(email_address, request_address,
+ 'key confirm {}'.format(self.token),
+ CONFIRM_REQUEST.format(
+ pgp_address.key_fingerprint,
+ self.token))
+ pgp_list = PGPMailingList.for_list(self.mlist)
+ wrapped = PGPWrapper(msg)
+ encrypted = wrapped.sign_encrypt(pgp_list.key, pgp_address.key)
+
+ msg.set_payload(encrypted.get_payload())
+ copy_headers(encrypted, msg, True)
+ msg.send(self.mlist)
+ raise StopIteration
+
+ def _step_receive_key_confirmation(self):
+ self._restore_subscriber()
+ self._set_token(TokenOwner.no_one)
+ with transaction():
+ pgp_address = PGPAddress.for_address(self.address)
+ pgp_address.key_confirmed = True
diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py
new file mode 100644
index 0000000..cc5b9fc
--- /dev/null
+++ b/src/mailman_pgp/workflows/key_change.py
@@ -0,0 +1,133 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+from mailman.email.message import UserNotification
+from mailman.interfaces.pending import IPendable, IPendings
+from mailman.interfaces.subscriptions import TokenOwner
+from mailman.interfaces.workflows import IWorkflow
+from mailman.workflows.base import Workflow
+from pgpy import PGPKey
+from public import public
+from zope.component import getUtility
+from zope.interface import implementer
+
+from mailman_pgp.database import transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.utils import copy_headers
+from mailman_pgp.pgp.wrapper import PGPWrapper
+
+CHANGE_CONFIRM_REQUEST = """\
+----------
+TODO: this is a pgp enabled list.
+You requested to change your key.
+Reply to this message with this whole text
+signed with your supplied key, either inline or PGP/MIME.
+
+Fingerprint: {}
+Token: {}
+----------
+"""
+
+
+@public
+@implementer(IWorkflow)
+class KeyChangeWorkflow(Workflow):
+ name = 'pgp-key-change-workflow'
+ description = ''
+ initial_state = 'change_key'
+ save_attributes = (
+ 'address_key',
+ 'pubkey_key'
+ )
+
+ def __init__(self, mlist, pgp_address=None, pubkey=None):
+ super().__init__()
+ self.mlist = mlist
+ self.pgp_list = PGPMailingList.for_list(mlist)
+ self.pgp_address = pgp_address
+ self.pubkey = pubkey
+
+ @property
+ def address_key(self):
+ return self.pgp_address.email
+
+ @address_key.setter
+ def address_key(self, value):
+ self.pgp_address = PGPAddress.for_email(value)
+ self.member = self.mlist.regular_members.get_member(value)
+
+ @property
+ def pubkey_key(self):
+ return str(self.pubkey)
+
+ @pubkey_key.setter
+ def pubkey_key(self, value):
+ self.pubkey, _ = PGPKey.from_blob(value)
+
+ def _step_change_key(self):
+ if self.pgp_address is None or self.pubkey is None:
+ raise ValueError
+
+ self.push('send_key_confirm_request')
+
+ def _step_send_key_confirm_request(self):
+ pendings = getUtility(IPendings)
+ pendable = KeyChangeWorkflow.pendable_class()(
+ email=self.pgp_address.email,
+ pubkey=str(self.pubkey),
+ fingerprint=self.pubkey.fingerprint
+ )
+ self.token = pendings.add(pendable)
+ self.token_owner = TokenOwner.subscriber
+
+ self.push('receive_confirmation')
+ self.save()
+ request_address = self.mlist.request_address
+ email_address = self.pgp_address.email
+ msg = UserNotification(email_address, request_address,
+ 'key confirm {}'.format(self.token),
+ CHANGE_CONFIRM_REQUEST.format(
+ self.pubkey.fingerprint,
+ self.token))
+ wrapped = PGPWrapper(msg)
+ encrypted = wrapped.sign_encrypt(self.pgp_list.key, self.pubkey)
+
+ msg.set_payload(encrypted.get_payload())
+ copy_headers(encrypted, msg, True)
+ msg.send(self.mlist)
+ raise StopIteration
+
+ def _step_receive_confirmation(self):
+ with transaction():
+ self.pgp_address.key = self.pubkey
+ self.pgp_address.key_confirmed = True
+
+ pendings = getUtility(IPendings)
+ if self.token is not None:
+ pendings.confirm(self.token)
+ self.token = None
+ self.token_owner = TokenOwner.no_one
+
+ @classmethod
+ def pendable_class(cls):
+ @implementer(IPendable)
+ class Pendable(dict):
+ PEND_TYPE = KeyChangeWorkflow.name
+
+ return Pendable
diff --git a/src/mailman_pgp/workflows/subscription.py b/src/mailman_pgp/workflows/subscription.py
new file mode 100644
index 0000000..ce02013
--- /dev/null
+++ b/src/mailman_pgp/workflows/subscription.py
@@ -0,0 +1,197 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+
+from mailman.core.i18n import _
+from mailman.interfaces.workflows import ISubscriptionWorkflow
+from mailman.workflows.common import (ConfirmationMixin, ModerationMixin,
+ SubscriptionBase, VerificationMixin)
+from public import public
+from zope.interface import implementer
+
+from mailman_pgp.workflows.base import (ConfirmPubkeyMixin, PGPMixin,
+ SetPubkeyMixin)
+
+
+@public
+@implementer(ISubscriptionWorkflow)
+class OpenSubscriptionPolicy(SubscriptionBase, VerificationMixin,
+ SetPubkeyMixin, ConfirmPubkeyMixin,
+ PGPMixin):
+ """"""
+
+ name = 'pgp-policy-open'
+ description = _('An open subscription policy, '
+ 'for a PGP-enabled mailing list.')
+ initial_state = 'prepare'
+ save_attributes = (
+ 'verified',
+ 'pubkey_key',
+ 'pubkey_confirmed',
+ 'address_key',
+ 'subscriber_key',
+ 'user_key',
+ 'token_owner_key',
+ )
+
+ def __init__(self, mlist, subscriber=None, *,
+ pre_verified=False, pubkey=None,
+ pubkey_pre_confirmed=False):
+ SubscriptionBase.__init__(self, mlist, subscriber)
+ VerificationMixin.__init__(self, pre_verified=pre_verified)
+ SetPubkeyMixin.__init__(self, pubkey=pubkey)
+ ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
+ PGPMixin.__init__(self)
+
+ def _step_prepare(self):
+ self.push('do_subscription')
+ self.push('pubkey_confirmation')
+ self.push('pubkey_checks')
+ self.push('pgp_prepare')
+ self.push('verification_checks')
+ self.push('sanity_checks')
+
+
+@public
+@implementer(ISubscriptionWorkflow)
+class ConfirmSubscriptionPolicy(SubscriptionBase, VerificationMixin,
+ ConfirmationMixin, SetPubkeyMixin,
+ ConfirmPubkeyMixin, PGPMixin):
+ """"""
+
+ name = 'pgp-policy-confirm'
+ description = _('A subscription policy, for a PGP-enabled mailing list '
+ 'that requires confirmation.')
+ initial_state = 'prepare'
+ save_attributes = (
+ 'verified',
+ 'confirmed',
+ 'pubkey_key',
+ 'pubkey_confirmed',
+ 'address_key',
+ 'subscriber_key',
+ 'user_key',
+ 'token_owner_key',
+ )
+
+ def __init__(self, mlist, subscriber=None, *,
+ pre_verified=False, pre_confirmed=False, pubkey=None,
+ pubkey_pre_confirmed=False):
+ SubscriptionBase.__init__(self, mlist, subscriber)
+ VerificationMixin.__init__(self, pre_verified=pre_verified)
+ ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed)
+ SetPubkeyMixin.__init__(self, pubkey=pubkey)
+ ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
+ PGPMixin.__init__(self)
+
+ def _step_prepare(self):
+ self.push('do_subscription')
+ self.push('pubkey_confirmation')
+ self.push('pubkey_checks')
+ self.push('pgp_prepare')
+ self.push('confirmation_checks')
+ self.push('verification_checks')
+ self.push('sanity_checks')
+
+
+@public
+@implementer(ISubscriptionWorkflow)
+class ModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
+ ModerationMixin, SetPubkeyMixin,
+ ConfirmPubkeyMixin, PGPMixin):
+ """"""
+
+ name = 'pgp-policy-moderate'
+ description = _('A subscription policy, for a PGP-enabled mailing list '
+ 'that requires moderation.')
+ initial_state = 'prepare'
+ save_attributes = (
+ 'verified',
+ 'approved',
+ 'pubkey_key',
+ 'pubkey_confirmed',
+ 'address_key',
+ 'subscriber_key',
+ 'user_key',
+ 'token_owner_key',
+ )
+
+ def __init__(self, mlist, subscriber=None, *,
+ pre_verified=False, pre_approved=False, pubkey=None,
+ pubkey_pre_confirmed=False):
+ SubscriptionBase.__init__(self, mlist, subscriber)
+ VerificationMixin.__init__(self, pre_verified=pre_verified)
+ ModerationMixin.__init__(self, pre_approved=pre_approved)
+ SetPubkeyMixin.__init__(self, pubkey=pubkey)
+ ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
+ PGPMixin.__init__(self)
+
+ def _step_prepare(self):
+ self.push('do_subscription')
+ self.push('moderation_checks')
+ self.push('pubkey_confirmation')
+ self.push('pubkey_checks')
+ self.push('pgp_prepare')
+ self.push('verification_checks')
+ self.push('sanity_checks')
+
+
+@public
+@implementer(ISubscriptionWorkflow)
+class ConfirmModerationSubscriptionPolicy(SubscriptionBase, VerificationMixin,
+ ConfirmationMixin, ModerationMixin,
+ SetPubkeyMixin, ConfirmPubkeyMixin,
+ PGPMixin):
+ """"""
+
+ name = 'pgp-policy-confirm-moderate'
+ description = _('A subscription policy, for a PGP-enabled mailing list '
+ 'that requires moderation after confirmation.')
+ initial_state = 'prepare'
+ save_attributes = (
+ 'verified',
+ 'confirmed',
+ 'approved',
+ 'pubkey_key',
+ 'pubkey_confirmed',
+ 'address_key',
+ 'subscriber_key',
+ 'user_key',
+ 'token_owner_key',
+ )
+
+ def __init__(self, mlist, subscriber=None, *,
+ pre_verified=False, pre_confirmed=False, pre_approved=False,
+ pubkey=None, pubkey_pre_confirmed=False):
+ SubscriptionBase.__init__(self, mlist, subscriber)
+ VerificationMixin.__init__(self, pre_verified=pre_verified)
+ ConfirmationMixin.__init__(self, pre_confirmed=pre_confirmed)
+ ModerationMixin.__init__(self, pre_approved=pre_approved)
+ SetPubkeyMixin.__init__(self, pubkey=pubkey)
+ ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
+ PGPMixin.__init__(self)
+
+ def _step_prepare(self):
+ self.push('do_subscription')
+ self.push('moderation_checks')
+ self.push('pubkey_confirmation')
+ self.push('pubkey_checks')
+ self.push('pgp_prepare')
+ self.push('confirmation_checks')
+ self.push('verification_checks')
+ self.push('sanity_checks')
diff --git a/src/mailman_pgp/workflows/tests/__init__.py b/src/mailman_pgp/workflows/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/mailman_pgp/workflows/tests/__init__.py
diff --git a/src/mailman_pgp/workflows/tests/test_base.py b/src/mailman_pgp/workflows/tests/test_base.py
new file mode 100644
index 0000000..e3cc833
--- /dev/null
+++ b/src/mailman_pgp/workflows/tests/test_base.py
@@ -0,0 +1,265 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+
+import unittest
+from contextlib import suppress
+from unittest.mock import patch
+
+from mailman.app.lifecycle import create_list
+from mailman.interfaces.pending import IPendings
+from mailman.interfaces.usermanager import IUserManager
+from mailman.interfaces.workflows import IWorkflow
+from mailman.testing.helpers import get_queue_messages
+from mailman.workflows.common import SubscriptionBase
+from public import public
+from zope.component import getUtility
+from zope.interface import implementer
+
+from mailman_pgp.database import mm_transaction, transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.tests.base import load_key
+from mailman_pgp.pgp.wrapper import PGPWrapper
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.workflows.base import (ConfirmPubkeyMixin, KEY_REQUEST,
+ PGPMixin, SetPubkeyMixin)
+
+
+class PubkeyMixinTestSetup():
+ def setUp(self):
+ with mm_transaction():
+ self.mlist = create_list('test@example.com',
+ style_name='pgp-default')
+
+ self.list_key = load_key('ecc_p256.priv.asc')
+
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = self.list_key
+
+ self.um = getUtility(IUserManager)
+
+ self.sender_key = load_key('rsa_1024.priv.asc')
+ self.sender = self.um.create_address('rsa-1024b@example.org')
+
+
+@implementer(IWorkflow)
+class PGPTestWorkflow(SubscriptionBase, PGPMixin, SetPubkeyMixin,
+ ConfirmPubkeyMixin):
+ name = 'test-workflow'
+ description = ''
+ initial_state = 'prepare'
+ save_attributes = (
+ 'pubkey_key',
+ 'pubkey_confirmed',
+ 'address_key',
+ 'subscriber_key',
+ 'user_key',
+ 'token_owner_key'
+ )
+
+ def __init__(self, mlist, subscriber=None, *, pubkey=None,
+ pubkey_pre_confirmed=False):
+ SubscriptionBase.__init__(self, mlist, subscriber)
+ SetPubkeyMixin.__init__(self, pubkey=pubkey)
+ ConfirmPubkeyMixin.__init__(self, pre_confirmed=pubkey_pre_confirmed)
+ PGPMixin.__init__(self)
+
+ def _step_prepare(self):
+ self.push('do_subscription')
+ self.push('pubkey_confirmation')
+ self.push('pubkey_checks')
+ self.push('pgp_prepare')
+ self.push('sanity_checks')
+
+
+@public
+class TestPGPMixin(PubkeyMixinTestSetup, unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def test_create_address(self):
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
+ workflow.run_thru('pgp_prepare')
+ pgp_address = PGPAddress.for_address(self.sender)
+ self.assertIsNotNone(pgp_address)
+
+ def test_address_existing(self):
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ t.add(pgp_address)
+ workflow.run_thru('pgp_prepare')
+ still = PGPAddress.for_address(self.sender)
+ self.assertIsNotNone(still)
+
+
+@public
+class TestSetPubkeyMixin(PubkeyMixinTestSetup, unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def test_key_request_sent(self):
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
+ list(workflow)
+ items = get_queue_messages('virgin', expected_count=1)
+ message = items[0].msg
+ token = workflow.token
+
+ self.assertEqual(message['Subject'], 'key set {}'.format(token))
+ self.assertEqual(message.get_payload(), KEY_REQUEST)
+
+ def test_receive_key(self):
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
+ list(workflow)
+ with transaction():
+ pgp_address = PGPAddress.for_address(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+
+ receive_workflow = PGPTestWorkflow(self.mlist)
+ receive_workflow.token = workflow.token
+ receive_workflow.restore()
+ receive_workflow.run_thru('receive_key')
+
+ def test_set_pubkey(self):
+ workflow = PGPTestWorkflow(self.mlist, self.sender,
+ pubkey=self.sender_key.pubkey)
+ workflow.run_thru('pubkey_checks')
+ pgp_address = PGPAddress.for_address(self.sender)
+ self.assertIsNotNone(pgp_address)
+ self.assertIsNotNone(pgp_address.key)
+ self.assertEqual(pgp_address.key_fingerprint,
+ self.sender_key.fingerprint)
+
+ def test_pubkey_set(self):
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ t.add(pgp_address)
+ workflow.run_thru('pubkey_checks')
+ self.assertEqual(pgp_address.key_fingerprint,
+ self.sender_key.fingerprint)
+
+
+@public
+class TestConfirmPubkeyMixin(PubkeyMixinTestSetup, unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def test_key_request_pubkey_set(self):
+ workflow = PGPTestWorkflow(self.mlist, self.sender,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=True)
+ workflow.run_thru('pubkey_confirmation')
+ with patch.object(workflow, '_step_do_subscription') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
+ def test_send_key_confirm_request(self):
+ workflow = PGPTestWorkflow(self.mlist, self.sender,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=False)
+ list(workflow)
+ items = get_queue_messages('virgin', expected_count=1)
+ message = items[0].msg
+ token = workflow.token
+
+ self.assertEqual(message['Subject'], 'key confirm {}'.format(token))
+ wrapped = PGPWrapper(message)
+ self.assertTrue(wrapped.is_encrypted())
+
+ def test_receive_confirmation(self):
+ workflow = PGPTestWorkflow(self.mlist, self.sender,
+ pubkey=self.sender_key.pubkey,
+ pubkey_pre_confirmed=False)
+ list(workflow)
+
+ receive_workflow = PGPTestWorkflow(self.mlist)
+ receive_workflow.token = workflow.token
+ receive_workflow.restore()
+ receive_workflow.run_thru('receive_key_confirmation')
+ with patch.object(receive_workflow, '_step_do_subscription') as step:
+ next(receive_workflow)
+ step.assert_called_once_with()
+
+
+@public
+class TestBothPubkeyMixins(PubkeyMixinTestSetup, unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def test_pended_data_key_request(self):
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
+ with suppress(StopIteration):
+ workflow.run_thru('send_key_request')
+ self.assertIsNotNone(workflow.token)
+ pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
+ self.assertEqual(pendable['list_id'], 'test.example.com')
+ self.assertEqual(pendable['email'], 'rsa-1024b@example.org')
+ self.assertEqual(pendable['display_name'], '')
+ self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
+ self.assertEqual(pendable['token_owner'], 'subscriber')
+
+ def test_pended_data_key_confirmation(self):
+ workflow = PGPTestWorkflow(self.mlist, self.sender,
+ pubkey=self.sender_key.pubkey)
+ with suppress(StopIteration):
+ workflow.run_thru('send_key_confirm_request')
+ self.assertIsNotNone(workflow.token)
+ pendable = getUtility(IPendings).confirm(workflow.token, expunge=False)
+ self.assertEqual(pendable['list_id'], 'test.example.com')
+ self.assertEqual(pendable['email'], 'rsa-1024b@example.org')
+ self.assertEqual(pendable['display_name'], '')
+ self.assertEqual(pendable['when'], '2005-08-01T07:49:23')
+ self.assertEqual(pendable['token_owner'], 'subscriber')
+
+ def test_exisitng_pgp_address(self):
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
+
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ workflow.run_thru('pubkey_confirmation')
+ with patch.object(workflow, '_step_do_subscription') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
+ def test_exisitng_pgp_address_not_confirmed(self):
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
+
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ t.add(pgp_address)
+
+ workflow.run_thru('pubkey_confirmation')
+ with patch.object(workflow, '_step_send_key_confirm_request') as step:
+ next(workflow)
+ step.assert_called_once_with()
+
+ def test_exisitng_pgp_address_no_key(self):
+ workflow = PGPTestWorkflow(self.mlist, self.sender)
+
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ t.add(pgp_address)
+
+ workflow.run_thru('pubkey_checks')
+ with patch.object(workflow, '_step_send_key_request') as step:
+ next(workflow)
+ step.assert_called_once_with()
diff --git a/src/mailman_pgp/workflows/tests/test_key_change.py b/src/mailman_pgp/workflows/tests/test_key_change.py
new file mode 100644
index 0000000..5d61efd
--- /dev/null
+++ b/src/mailman_pgp/workflows/tests/test_key_change.py
@@ -0,0 +1,105 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+
+import unittest
+
+from mailman.app.lifecycle import create_list
+from mailman.interfaces.subscriptions import ISubscriptionManager, TokenOwner
+from mailman.interfaces.usermanager import IUserManager
+from mailman.testing.helpers import get_queue_messages
+from public import public
+from zope.component import getUtility
+
+from mailman_pgp.database import mm_transaction, transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.tests.base import load_key
+from mailman_pgp.pgp.wrapper import PGPWrapper
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.workflows.key_change import KeyChangeWorkflow
+
+
+@public
+class TestKeyChangeWorkflow(unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ with mm_transaction():
+ self.mlist = create_list('test@example.com',
+ style_name='pgp-default')
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.key = load_key('ecc_p256.priv.asc')
+
+ self.sender_key = load_key('rsa_1024.priv.asc')
+ self.sender_new_key = load_key('ecc_p256.priv.asc')
+ self.sender = getUtility(IUserManager).create_address(
+ 'rsa-1024b@example.org')
+
+ def test_pgp_address_none(self):
+ workflow = KeyChangeWorkflow(self.mlist)
+ with self.assertRaises(ValueError):
+ list(workflow)
+
+ def test_pubkey_none(self):
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ t.add(pgp_address)
+
+ workflow = KeyChangeWorkflow(self.mlist, pgp_address)
+ with self.assertRaises(ValueError):
+ list(workflow)
+
+ def test_send_key_confirm_request(self):
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ workflow = KeyChangeWorkflow(self.mlist, pgp_address,
+ self.sender_new_key.pubkey)
+ list(workflow)
+ items = get_queue_messages('virgin', expected_count=1)
+ message = items[0].msg
+ token = workflow.token
+
+ self.assertEqual(message['Subject'], 'key confirm {}'.format(token))
+ wrapped = PGPWrapper(message)
+ self.assertTrue(wrapped.is_encrypted())
+
+ def test_confirm(self):
+ with transaction() as t:
+ pgp_address = PGPAddress(self.sender)
+ pgp_address.key = self.sender_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ workflow = KeyChangeWorkflow(self.mlist, pgp_address,
+ self.sender_new_key.pubkey)
+ list(workflow)
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).confirm(
+ workflow.token)
+ self.assertIsNone(token)
+ self.assertEqual(token_owner, TokenOwner.no_one)
+
+ pgp_address = PGPAddress.for_address(self.sender)
+ self.assertEqual(pgp_address.key_fingerprint,
+ self.sender_new_key.fingerprint)
+ self.assertTrue(pgp_address.key_confirmed)
diff --git a/src/mailman_pgp/workflows/tests/test_subscription.py b/src/mailman_pgp/workflows/tests/test_subscription.py
new file mode 100644
index 0000000..f9fa1e1
--- /dev/null
+++ b/src/mailman_pgp/workflows/tests/test_subscription.py
@@ -0,0 +1,58 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+import unittest
+
+from mailman.app.lifecycle import create_list
+from mailman.interfaces.usermanager import IUserManager
+from public import public
+from zope.component import getUtility
+
+from mailman_pgp.database import mm_transaction
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.workflows.subscription import (
+ ConfirmModerationSubscriptionPolicy, ConfirmSubscriptionPolicy,
+ ModerationSubscriptionPolicy, OpenSubscriptionPolicy)
+
+
+@public
+class TestSubscriptionWorkflows(unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ with mm_transaction():
+ self.mlist = create_list('test@example.com',
+ style_name='pgp-default')
+ self.sender = getUtility(IUserManager).create_address(
+ 'rsa-1024b@example.org')
+
+ def test_open_policy(self):
+ workflow = OpenSubscriptionPolicy(self.mlist, self.sender)
+ next(workflow)
+
+ def test_confirm_policy(self):
+ workflow = ConfirmSubscriptionPolicy(self.mlist, self.sender)
+ next(workflow)
+
+ def test_moderation_policy(self):
+ workflow = ModerationSubscriptionPolicy(self.mlist, self.sender)
+ next(workflow)
+
+ def test_confirm_moderation_policy(self):
+ workflow = ConfirmModerationSubscriptionPolicy(self.mlist, self.sender)
+ next(workflow)