diff options
| author | J08nY | 2017-06-25 00:51:29 +0200 |
|---|---|---|
| committer | J08nY | 2017-06-25 00:51:29 +0200 |
| commit | a47b61581f6ddd040c25f8e4bb3dd6bd7b51aa30 (patch) | |
| tree | 0fdd8eb0b47021f52e2d0b0258c8e4fae1cf8e31 /src/mailman_pgp/pgp/inline.py | |
| parent | 52ab7fcef755d0adea8a23b5aa77e30119356ac1 (diff) | |
| download | mailman-pgp-a47b61581f6ddd040c25f8e4bb3dd6bd7b51aa30.tar.gz mailman-pgp-a47b61581f6ddd040c25f8e4bb3dd6bd7b51aa30.tar.zst mailman-pgp-a47b61581f6ddd040c25f8e4bb3dd6bd7b51aa30.zip | |
Diffstat (limited to 'src/mailman_pgp/pgp/inline.py')
| -rw-r--r-- | src/mailman_pgp/pgp/inline.py | 178 |
1 files changed, 138 insertions, 40 deletions
diff --git a/src/mailman_pgp/pgp/inline.py b/src/mailman_pgp/pgp/inline.py index 94091cc..a327ef4 100644 --- a/src/mailman_pgp/pgp/inline.py +++ b/src/mailman_pgp/pgp/inline.py @@ -16,8 +16,11 @@ # this program. If not, see <http://www.gnu.org/licenses/>. """Strict inline PGP message wrapper.""" +import copy +from email.iterators import walk from pgpy import PGPKey, PGPMessage +from pgpy.constants import SymmetricKeyAlgorithm from pgpy.types import Armorable from public import public @@ -35,54 +38,80 @@ class InlineWrapper: """ self.msg = msg - def _is_inline(self): - return not self.msg.is_multipart() + def _walk(self, walk_fn, *args, **kwargs): + for part in walk(self.msg): + if not part.is_multipart(): + yield walk_fn(part, *args, **kwargs) - def _as_string(self): - return str(self.msg.get_payload()) - - def _has_signature(self): + def _is_signed(self, part): try: - msg = PGPMessage.from_blob(self._as_string()) + msg = PGPMessage.from_blob(part.get_payload()) return msg.is_signed except: pass return False - def _has_message(self): + def is_signed(self): + """ + Whether the message is inline signed. + + :return: If the message is inline signed. + :rtype: bool + """ + return all(self._walk(self._is_signed)) + + def has_signature(self): + """ + Whether some parts of the message are inline signed. + + :return: If some parts of the message are inline signed. + :rtype: bool + """ + return any(self._walk(self._is_signed)) + + def _is_encrypted(self, part): try: - msg = PGPMessage.from_blob(self._as_string()) + msg = PGPMessage.from_blob(part.get_payload()) return msg.is_encrypted except: pass return False - def _has_armor(self, block_type): - try: - dearm = Armorable.ascii_unarmor(self._as_string()) - if dearm['magic'] == block_type: - return True - except: - pass - return False + def is_encrypted(self): + """ + Whether the message is inline encrypted. - def is_signed(self): + :return: If the message is inline encrypted. + :rtype: bool """ - Whether the message is inline signed (cleartext). + return all(self._walk(self._is_encrypted)) - :return: If the message is inline signed. + def has_encryption(self): + """ + Whether some parts of the message are inline encrypted. + + :return: If some parts of the message are inline encrypted. :rtype: bool """ - return self._is_inline() and self._has_signature() + return any(self._walk(self._is_encrypted)) - def is_encrypted(self): + def _has_keys(self, part): + try: + dearm = Armorable.ascii_unarmor(part.get_payload()) + if dearm['magic'] in ('PUBLIC KEY BLOCK', 'PRIVATE KEY BLOCK'): + return True + except: + pass + return False + + def is_keys(self): """ - Whether the message is inline encrypted. + Whether the message is all keys (all parts). - :return: If the message is inline encrypted. + :return: If the message is keys. :rtype: bool """ - return self._is_inline() and self._has_message() + return all(self._walk(self._has_keys)) def has_keys(self): """ @@ -91,8 +120,15 @@ class InlineWrapper: :return: If the message contains keys. :rtype: bool """ - return self._is_inline() and (self._has_armor('PUBLIC KEY BLOCK') or - self._has_armor('PRIVATE KEY BLOCK')) + return any(self._walk(self._has_keys)) + + def _keys(self, part): + try: + # TODO: potentially return all things returned from from_blob? + key, _ = PGPKey.from_blob(part.get_payload()) + return key + except: + pass def keys(self): """ @@ -100,9 +136,15 @@ class InlineWrapper: :return: A collection of keys. """ - # TODO: potentially return all things returned from from_blob? - key, _ = PGPKey.from_blob(self._as_string()) - yield key + yield from self._walk(self._keys) + + def _verify(self, part, key): + try: + message = PGPMessage.from_blob(part.get_payload()) + return key.verify(message) + except: + pass + return False def verify(self, key): """ @@ -110,14 +152,38 @@ class InlineWrapper: :param key: The key to verify with. :type key: pgpy.PGPKey - :return: The verified signature. + :return: The verified signatures. :rtype: pgpy.types.SignatureVerification """ - message = PGPMessage.from_blob(self._as_string()) - return key.verify(message) + for part in walk(self.msg): + if not part.is_multipart() and self._is_signed(part): + yield self._verify(part, key) + + def _sign(self, part, key): + payload = str(part.get_payload()) + pmsg = PGPMessage.new(payload, cleartext=True) + pmsg |= key.sign(pmsg) + part.set_payload(str(pmsg)) + + def sign(self, key): + """ + Sign a message with key. + + :param key: The key to sign with. + :type key: pgpy.PGPKey + :return: The signed message. + :rtype: mailman.email.message.Message + """ + out = copy.deepcopy(self.msg) + for part in walk(out): + if not part.is_multipart(): + self._sign(part, key) + return out - def sign(self): - pass + def _decrypt(self, part, key): + message = PGPMessage.from_blob(part.get_payload()) + decrypted = key.decrypt(message) + part.set_payload(decrypted.message) def decrypt(self, key): """ @@ -126,10 +192,42 @@ class InlineWrapper: :param key: The key to decrypt with. :type key: pgpy.PGPKey :return: The decrypted message. - :rtype: PGPMessage + :rtype: mailman.email.message.Message + """ + out = copy.deepcopy(self.msg) + for part in walk(out): + if not part.is_multipart() and self._is_encrypted(part): + self._decrypt(part, key) + return out + + def _encrypt(self, part, *keys, **kwargs): + payload = str(part.get_payload()) + pmsg = PGPMessage.new(payload) + for key in keys: + pmsg = key.encrypt(pmsg, **kwargs) + part.set_payload(str(pmsg)) + + def encrypt(self, *keys, cipher=SymmetricKeyAlgorithm.AES256): + """ + Encrypt the message with key/s, using cipher. + + :param keys: The key/s to encrypt with. + :type keys: pgpy.PGPKey + :param cipher: The symmetric cipher to use. + :type cipher: SymmetricKeyAlgorithm + :return: mailman.email.message.Message """ - message = PGPMessage.from_blob(self._as_string()) - return key.decrypt(message) + if len(keys) == 0: + raise ValueError('At least one key necessary.') - def encrypt(self): - pass + out = copy.deepcopy(self.msg) + for part in walk(out): + if not part.is_multipart(): + if len(keys) == 1: + self._encrypt(part, *keys) + else: + session_key = cipher.gen_key() + for key in keys: + self._encrypt(part, key, session_key=session_key) + del session_key + return out |
