diff options
| author | J08nY | 2017-07-11 20:57:29 +0200 |
|---|---|---|
| committer | J08nY | 2017-07-11 20:57:29 +0200 |
| commit | 77b63eaae46697b24fe4cc604a36b869619b4638 (patch) | |
| tree | b91c61103b588f822313abb0716bae76f168ca44 /src | |
| parent | 97abc15b2e92fcf9109997043e4297f16d0bf5c7 (diff) | |
| download | mailman-pgp-77b63eaae46697b24fe4cc604a36b869619b4638.tar.gz mailman-pgp-77b63eaae46697b24fe4cc604a36b869619b4638.tar.zst mailman-pgp-77b63eaae46697b24fe4cc604a36b869619b4638.zip | |
Diffstat (limited to 'src')
| -rw-r--r-- | src/mailman_pgp/commands/eml_key.py | 216 | ||||
| -rw-r--r-- | src/mailman_pgp/commands/tests/__init__.py | 0 | ||||
| -rw-r--r-- | src/mailman_pgp/commands/tests/test_key.py | 123 | ||||
| -rw-r--r-- | src/mailman_pgp/model/address.py | 10 | ||||
| -rw-r--r-- | src/mailman_pgp/pgp/mime.py | 22 |
5 files changed, 285 insertions, 86 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index b280603..e9b5e19 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -32,106 +32,154 @@ from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.workflows.base import CONFIRM_REQUEST -@public -@implementer(IEmailCommand) -class KeyCommand: - """The `key` command.""" +def _get_email(msg): + display_name, email = parseaddr(msg['from']) + # Address could be None or the empty string. + if not email: + email = msg.sender + return email - name = 'key' - argument_description = '<set|confirm|change|revoke|sign>' - short_description = '' - description = '' - def process(self, mlist, msg, msgdata, arguments, results): - """See `IEmailCommand`.""" - if len(arguments) == 0: - print('No sub-command specified,' - ' must be one of <change|revoke|sign>.', file=results) - return ContinueProcessing.no +def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results): + if len(arguments) != 2: + print('Missing token.', file=results) + return ContinueProcessing.no - pgp_list = PGPMailingList.for_list(mlist) - if pgp_list is None: - print('This mailing list doesnt have pgp enabled.', file=results) - return ContinueProcessing.yes # XXX: What does this even mean? + wrapped = PGPWrapper(msg) + if not wrapped.has_keys(): + print('No keys attached? Send a key.', file=results) + return ContinueProcessing.no + + keys = list(wrapped.keys()) + if len(keys) != 1: + print('More than one key! Send only one key.', file=results) + return ContinueProcessing.no + + email = _get_email(msg) + if not email: + print('No email to subscribe with.', file=results) + return ContinueProcessing.no + + address = getUtility(IUserManager).get_address(email) + if not address: + print('No adddress to subscribe with.', file=results) + return ContinueProcessing.no - if arguments[0] == 'set': - if len(arguments) != 2: - print('Missing token', file=results) - return ContinueProcessing.no + with transaction() as t: + pgp_address = PGPAddress(address) + pgp_address.key = keys.pop() + t.add(pgp_address) - wrapped = PGPWrapper(msg) - if not wrapped.has_keys(): - print('No keys here?') - return ContinueProcessing.no + token = arguments[1] + try: + ISubscriptionManager(mlist).confirm(token) + print('Key succesfully set.', file=results) + print('Key fingerprint: {}'.format(pgp_address.key.fingerprint), file=results) + except LookupError: + print('Wrong token.', file=results) - keys = list(wrapped.keys()) - if len(keys) != 1: - print('More than one key! Which one is yours?') - return ContinueProcessing.no + return ContinueProcessing.no - with transaction() as t: - pgp_address = PGPAddress(self.address) - pgp_address.key = keys.pop() - t.add(pgp_address) - token = arguments[1] - ISubscriptionManager(mlist).confirm(token) - # XXX finish this - elif arguments[0] == 'confirm': - if len(arguments) != 2: - print('Missing token', file=results) - return ContinueProcessing.no +def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): + if len(arguments) != 2: + print('Missing token', file=results) + return ContinueProcessing.no - display_name, email = parseaddr(msg['from']) - # Address could be None or the empty string. - if not email: - email = msg.sender - if not email: - print('No email to subscribe with.', file=results) - return ContinueProcessing.no - um = getUtility(IUserManager) + email = _get_email(msg) + if not email: + print('No email to subscribe with.', file=results) + return ContinueProcessing.no - pgp_address = PGPAddress.for_address(um.get_address(email)) - if pgp_address is None: - # TODO - pass + pgp_address = PGPAddress.for_email(email) + if pgp_address is None: + print('A pgp enabled address not found.', file=results) + return ContinueProcessing.no - wrapped = PGPWrapper(msg) - if wrapped.is_encrypted(): - decrypted = wrapped.decrypt(pgp_list.key) - wrapped = PGPWrapper(decrypted) + wrapped = PGPWrapper(msg) + if wrapped.is_encrypted(): + decrypted = wrapped.decrypt(pgp_list.key) + wrapped = PGPWrapper(decrypted) - if not wrapped.is_signed(): - print('Message not signed, ignoring.', file=results) - return ContinueProcessing.no + if not wrapped.is_signed(): + print('Message not signed, ignoring.', file=results) + return ContinueProcessing.no - if not wrapped.verifies(pgp_address.key): - print('Message failed to verify.', file=results) - return ContinueProcessing.no + if not wrapped.verifies(pgp_address.key): + print('Message failed to verify.', file=results) + return ContinueProcessing.no - token = arguments[1] + token = arguments[1] - expecting = CONFIRM_REQUEST.format(pgp_address.key.pubkey, - token) - if wrapped.get_signed() == expecting: + expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint, + token) + for sig_subject in wrapped.get_signed(): + if expecting in sig_subject: + with transaction(): + pgp_address.key_confirmed = True ISubscriptionManager(mlist).confirm(token) - else: - # TODO - pass + break + else: + print("Message doesn't contain the expected statement.", file=results) + return ContinueProcessing.no + + +def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results): + # New public key in attachment, requires to be signed with current + # key + pass - elif arguments[0] == 'change': - # New public key in attachment, requires to be signed with current - # key - pass - elif arguments[0] == 'revoke': - # Current key revocation certificate in attachment, restarts the - # subscription process, or rather only it's key setup part. - pass - elif arguments[0] == 'sign': - # List public key attached, signed by the users current key. - pass - else: + +def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results): + # Current key revocation certificate in attachment, restarts the + # subscription process, or rather only it's key setup part. + pass + + +def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results): + # List public key attached, signed by the users current key. + pass + + +SUBCOMMANDS = { + 'set': _cmd_set, + 'confirm': _cmd_confirm, + 'change': _cmd_change, + 'revoke': _cmd_revoke, + 'sign': _cmd_sign +} + +ARGUMENTS = '<' + '|'.join(SUBCOMMANDS.keys()) + '>' + + +@public +@implementer(IEmailCommand) +class KeyCommand: + """The `key` command.""" + + name = 'key' + argument_description = ARGUMENTS + short_description = '' + description = """\ + + """ + + def process(self, mlist, msg, msgdata, arguments, results): + """See `IEmailCommand`.""" + if len(arguments) == 0: + print('No sub-command specified,' + ' must be one of {}.'.format(ARGUMENTS), file=results) + return ContinueProcessing.no + + if arguments[0] not in SUBCOMMANDS: print('Wrong sub-command specified,' - ' must be one of <change|revoke|sign>.', file=results) + ' must be one of {}.'.format(ARGUMENTS), file=results) + return ContinueProcessing.no - return ContinueProcessing.no + pgp_list = PGPMailingList.for_list(mlist) + if pgp_list is None: + print("This mailing list doesn't have pgp enabled.", file=results) + return ContinueProcessing.no + + command = SUBCOMMANDS[arguments[0]] + return command(pgp_list, mlist, msg, msgdata, arguments, results) diff --git a/src/mailman_pgp/commands/tests/__init__.py b/src/mailman_pgp/commands/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/mailman_pgp/commands/tests/__init__.py diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py new file mode 100644 index 0000000..8b44011 --- /dev/null +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -0,0 +1,123 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" + +import unittest + +from mailman.app.lifecycle import create_list +from mailman.email.message import Message +from mailman.interfaces.subscriptions import ISubscriptionManager +from mailman.interfaces.usermanager import IUserManager +from mailman.runners.command import CommandRunner +from mailman.testing.helpers import make_testable_runner, get_queue_messages +from mailman.utilities.datetime import now +from zope.component import getUtility + +from mailman_pgp.config import mm_config +from mailman_pgp.database import transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.mime import MIMEWrapper +from mailman_pgp.pgp.tests.base import load_key +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.workflows.base import CONFIRM_REQUEST +from mailman_pgp.workflows.subscription import OpenSubscriptionPolicy + + +class TestPreSubscription(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + self.mlist = create_list('test@example.com', style_name='pgp-default') + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.generate_key(True) + + def test_set(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart) + + get_queue_messages('virgin') + + bart_key = load_key('rsa_1024.priv.asc') + + set_message = Message() + set_message['From'] = 'bart@example.com' + set_message['To'] = 'test@example.com' + set_message['Subject'] = 'Re: key set {}'.format(token) + set_message.set_type('multipart/mixed') + wrapped_set_message = MIMEWrapper(set_message) + set_message = wrapped_set_message.attach_key(bart_key.pubkey) + + mm_config.switchboards['command'].enqueue(set_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + pgp_address = PGPAddress.for_address(bart) + self.assertIsNotNone(pgp_address) + self.assertEqual(pgp_address.key.fingerprint, bart_key.fingerprint) + self.assertEqual(pgp_address.key_fingerprint, bart_key.fingerprint) + self.assertFalse(pgp_address.key_confirmed) + + items = get_queue_messages('virgin', expected_count=2) + if items[0].msg['Subject'] == 'The results of your email commands': + confirm_request = items[1].msg + else: + confirm_request = items[0].msg + + confirm_wrapped = PGPWrapper(confirm_request) + self.assertTrue(confirm_wrapped.is_encrypted()) + + def test_confirm(self): + self.mlist.subscription_policy = OpenSubscriptionPolicy + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + bart.verified_on = now() + + bart_key = load_key('rsa_1024.priv.asc') + + token, token_owner, member = ISubscriptionManager(self.mlist).register( + bart, pubkey=bart_key.pubkey) + + get_queue_messages('virgin') + + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key =bart_key.pubkey + t.add(pgp_address) + + confirm_message = Message() + confirm_message['From'] = 'bart@example.com' + confirm_message['To'] = 'test@example.com' + confirm_message['Subject'] = 'Re: key confirm {}'.format(token) + confirm_message.set_payload( + CONFIRM_REQUEST.format(bart_key.fingerprint, token)) + wrapped_confirm_message = MIMEWrapper(confirm_message) + confirm_message = wrapped_confirm_message.sign(bart_key) + + mm_config.switchboards['command'].enqueue(confirm_message, + listid='test.example.com') + make_testable_runner(CommandRunner, 'command').run() + + pgp_address = PGPAddress.for_address(bart) + self.assertTrue(pgp_address.key_confirmed) + self.assertTrue(self.mlist.is_subscribed(bart)) diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py index 1d07486..6fc3050 100644 --- a/src/mailman_pgp/model/address.py +++ b/src/mailman_pgp/model/address.py @@ -22,7 +22,7 @@ from os.path import exists, isfile, join from mailman.database.types import SAUnicode from mailman.interfaces.usermanager import IUserManager from pgpy import PGPKey -from sqlalchemy import Column, Integer +from sqlalchemy import Column, Integer, Boolean from sqlalchemy.orm import reconstructor from zope.component import getUtility @@ -38,6 +38,7 @@ class PGPAddress(Base): id = Column(Integer, primary_key=True) email = Column(SAUnicode, index=True) key_fingerprint = Column(SAUnicode) + key_confirmed = Column(Boolean) def __init__(self, address): super().__init__() @@ -49,6 +50,7 @@ class PGPAddress(Base): def _init(self): self._address = None self._key = None + self.key_confirmed = False @property def key(self): @@ -119,3 +121,9 @@ class PGPAddress(Base): if address is None: return None return PGPAddress.query().filter_by(email=address.email).first() + + @staticmethod + def for_email(email): + if email is None: + return None + return PGPAddress.query().filter_by(email=email).first() diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py index 1ecbe76..975b8bd 100644 --- a/src/mailman_pgp/pgp/mime.py +++ b/src/mailman_pgp/pgp/mime.py @@ -87,7 +87,7 @@ class MIMEWrapper: return self.is_signed() def get_signed(self): - yield self.msg.get_payload(0).as_string(0) + yield self.msg.get_payload(0).as_string() def is_encrypted(self): """ @@ -145,6 +145,26 @@ class MIMEWrapper: key, _ = PGPKey.from_blob(part.get_payload()) yield key + def attach_key(self, key): + """ + + :param key: + :type key: pgpy.PGPKey + :return: + """ + filename = '0x' + key.fingerprint.keyid + '.asc' + key_part = MIMEApplication(_data=str(key), + _subtype=MIMEWrapper._keys_subtype, + _encoder=encode_7or8bit, + name=filename) + key_part.add_header('Content-Description', + 'OpenPGP key') + key_part.add_header('Content-Disposition', 'attachment', + filename=filename) + out = copy.deepcopy(self.msg) + out.attach(key_part) + return out + def verify(self, key): """ Verify the signature of this message with key. |
