# Copyright (C) 2017 Jan Jancar # # This file is a part of the Mailman PGP plugin. # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more details. # # You should have received a copy of the GNU General Public License along with # this program. If not, see . """Strict inline PGP message wrapper.""" import copy from email.iterators import walk from pgpy import PGPKey, PGPMessage from pgpy.constants import SymmetricKeyAlgorithm from pgpy.types import Armorable from public import public @public class InlineWrapper: """Inline PGP wrapper.""" def __init__(self, msg): """ Wrap the given message. :param msg: The message to wrap. :type msg: mailman.email.message.Message """ self.msg = msg def _walk(self, walk_fn, *args, **kwargs): for part in walk(self.msg): if not part.is_multipart(): yield walk_fn(part, *args, **kwargs) def _is_signed(self, part): try: msg = PGPMessage.from_blob(part.get_payload()) return msg.is_signed except: pass return False def is_signed(self): """ Whether the message is inline signed. :return: If the message is inline signed. :rtype: bool """ return all(self._walk(self._is_signed)) def has_signature(self): """ Whether some parts of the message are inline signed. :return: If some parts of the message are inline signed. :rtype: bool """ return any(self._walk(self._is_signed)) def _is_encrypted(self, part): try: msg = PGPMessage.from_blob(part.get_payload()) return msg.is_encrypted except: pass return False def is_encrypted(self): """ Whether the message is inline encrypted. :return: If the message is inline encrypted. :rtype: bool """ return all(self._walk(self._is_encrypted)) def has_encryption(self): """ Whether some parts of the message are inline encrypted. :return: If some parts of the message are inline encrypted. :rtype: bool """ return any(self._walk(self._is_encrypted)) def _has_keys(self, part): try: dearm = Armorable.ascii_unarmor(part.get_payload()) if dearm['magic'] in ('PUBLIC KEY BLOCK', 'PRIVATE KEY BLOCK'): return True except: pass return False def is_keys(self): """ Whether the message is all keys (all parts). :return: If the message is keys. :rtype: bool """ return all(self._walk(self._has_keys)) def has_keys(self): """ Whether the message contains public or private keys. :return: If the message contains keys. :rtype: bool """ return any(self._walk(self._has_keys)) def _keys(self, part): try: # TODO: potentially return all things returned from from_blob? key, _ = PGPKey.from_blob(part.get_payload()) return key except: pass def keys(self): """ Get the collection of keys in this message. :return: A collection of keys. """ yield from self._walk(self._keys) def _verify(self, part, key): try: message = PGPMessage.from_blob(part.get_payload()) return key.verify(message) except: pass return False def verify(self, key): """ Verify the signature of this message with key. :param key: The key to verify with. :type key: pgpy.PGPKey :return: The verified signatures. :rtype: pgpy.types.SignatureVerification """ for part in walk(self.msg): if not part.is_multipart() and self._is_signed(part): yield self._verify(part, key) def _sign(self, part, key): payload = str(part.get_payload()) pmsg = PGPMessage.new(payload, cleartext=True) pmsg |= key.sign(pmsg) part.set_payload(str(pmsg)) def sign(self, key): """ Sign a message with key. :param key: The key to sign with. :type key: pgpy.PGPKey :return: The signed message. :rtype: mailman.email.message.Message """ out = copy.deepcopy(self.msg) for part in walk(out): if not part.is_multipart(): self._sign(part, key) return out def _decrypt(self, part, key): message = PGPMessage.from_blob(part.get_payload()) decrypted = key.decrypt(message) part.set_payload(decrypted.message) def decrypt(self, key): """ Decrypt this message with key. :param key: The key to decrypt with. :type key: pgpy.PGPKey :return: The decrypted message. :rtype: mailman.email.message.Message """ out = copy.deepcopy(self.msg) for part in walk(out): if not part.is_multipart() and self._is_encrypted(part): self._decrypt(part, key) return out def _encrypt(self, part, *keys, **kwargs): payload = str(part.get_payload()) pmsg = PGPMessage.new(payload) for key in keys: pmsg = key.encrypt(pmsg, **kwargs) part.set_payload(str(pmsg)) def encrypt(self, *keys, cipher=SymmetricKeyAlgorithm.AES256): """ Encrypt the message with key/s, using cipher. :param keys: The key/s to encrypt with. :type keys: pgpy.PGPKey :param cipher: The symmetric cipher to use. :type cipher: SymmetricKeyAlgorithm :return: mailman.email.message.Message """ if len(keys) == 0: raise ValueError('At least one key necessary.') out = copy.deepcopy(self.msg) for part in walk(out): if not part.is_multipart(): if len(keys) == 1: self._encrypt(part, *keys) else: session_key = cipher.gen_key() for key in keys: self._encrypt(part, key, session_key=session_key) del session_key return out