aboutsummaryrefslogtreecommitdiff
path: root/src/mailman_pgp/pgp/inline.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman_pgp/pgp/inline.py')
-rw-r--r--src/mailman_pgp/pgp/inline.py178
1 files changed, 138 insertions, 40 deletions
diff --git a/src/mailman_pgp/pgp/inline.py b/src/mailman_pgp/pgp/inline.py
index 94091cc..a327ef4 100644
--- a/src/mailman_pgp/pgp/inline.py
+++ b/src/mailman_pgp/pgp/inline.py
@@ -16,8 +16,11 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
"""Strict inline PGP message wrapper."""
+import copy
+from email.iterators import walk
from pgpy import PGPKey, PGPMessage
+from pgpy.constants import SymmetricKeyAlgorithm
from pgpy.types import Armorable
from public import public
@@ -35,54 +38,80 @@ class InlineWrapper:
"""
self.msg = msg
- def _is_inline(self):
- return not self.msg.is_multipart()
+ def _walk(self, walk_fn, *args, **kwargs):
+ for part in walk(self.msg):
+ if not part.is_multipart():
+ yield walk_fn(part, *args, **kwargs)
- def _as_string(self):
- return str(self.msg.get_payload())
-
- def _has_signature(self):
+ def _is_signed(self, part):
try:
- msg = PGPMessage.from_blob(self._as_string())
+ msg = PGPMessage.from_blob(part.get_payload())
return msg.is_signed
except:
pass
return False
- def _has_message(self):
+ def is_signed(self):
+ """
+ Whether the message is inline signed.
+
+ :return: If the message is inline signed.
+ :rtype: bool
+ """
+ return all(self._walk(self._is_signed))
+
+ def has_signature(self):
+ """
+ Whether some parts of the message are inline signed.
+
+ :return: If some parts of the message are inline signed.
+ :rtype: bool
+ """
+ return any(self._walk(self._is_signed))
+
+ def _is_encrypted(self, part):
try:
- msg = PGPMessage.from_blob(self._as_string())
+ msg = PGPMessage.from_blob(part.get_payload())
return msg.is_encrypted
except:
pass
return False
- def _has_armor(self, block_type):
- try:
- dearm = Armorable.ascii_unarmor(self._as_string())
- if dearm['magic'] == block_type:
- return True
- except:
- pass
- return False
+ def is_encrypted(self):
+ """
+ Whether the message is inline encrypted.
- def is_signed(self):
+ :return: If the message is inline encrypted.
+ :rtype: bool
"""
- Whether the message is inline signed (cleartext).
+ return all(self._walk(self._is_encrypted))
- :return: If the message is inline signed.
+ def has_encryption(self):
+ """
+ Whether some parts of the message are inline encrypted.
+
+ :return: If some parts of the message are inline encrypted.
:rtype: bool
"""
- return self._is_inline() and self._has_signature()
+ return any(self._walk(self._is_encrypted))
- def is_encrypted(self):
+ def _has_keys(self, part):
+ try:
+ dearm = Armorable.ascii_unarmor(part.get_payload())
+ if dearm['magic'] in ('PUBLIC KEY BLOCK', 'PRIVATE KEY BLOCK'):
+ return True
+ except:
+ pass
+ return False
+
+ def is_keys(self):
"""
- Whether the message is inline encrypted.
+ Whether the message is all keys (all parts).
- :return: If the message is inline encrypted.
+ :return: If the message is keys.
:rtype: bool
"""
- return self._is_inline() and self._has_message()
+ return all(self._walk(self._has_keys))
def has_keys(self):
"""
@@ -91,8 +120,15 @@ class InlineWrapper:
:return: If the message contains keys.
:rtype: bool
"""
- return self._is_inline() and (self._has_armor('PUBLIC KEY BLOCK') or
- self._has_armor('PRIVATE KEY BLOCK'))
+ return any(self._walk(self._has_keys))
+
+ def _keys(self, part):
+ try:
+ # TODO: potentially return all things returned from from_blob?
+ key, _ = PGPKey.from_blob(part.get_payload())
+ return key
+ except:
+ pass
def keys(self):
"""
@@ -100,9 +136,15 @@ class InlineWrapper:
:return: A collection of keys.
"""
- # TODO: potentially return all things returned from from_blob?
- key, _ = PGPKey.from_blob(self._as_string())
- yield key
+ yield from self._walk(self._keys)
+
+ def _verify(self, part, key):
+ try:
+ message = PGPMessage.from_blob(part.get_payload())
+ return key.verify(message)
+ except:
+ pass
+ return False
def verify(self, key):
"""
@@ -110,14 +152,38 @@ class InlineWrapper:
:param key: The key to verify with.
:type key: pgpy.PGPKey
- :return: The verified signature.
+ :return: The verified signatures.
:rtype: pgpy.types.SignatureVerification
"""
- message = PGPMessage.from_blob(self._as_string())
- return key.verify(message)
+ for part in walk(self.msg):
+ if not part.is_multipart() and self._is_signed(part):
+ yield self._verify(part, key)
+
+ def _sign(self, part, key):
+ payload = str(part.get_payload())
+ pmsg = PGPMessage.new(payload, cleartext=True)
+ pmsg |= key.sign(pmsg)
+ part.set_payload(str(pmsg))
+
+ def sign(self, key):
+ """
+ Sign a message with key.
+
+ :param key: The key to sign with.
+ :type key: pgpy.PGPKey
+ :return: The signed message.
+ :rtype: mailman.email.message.Message
+ """
+ out = copy.deepcopy(self.msg)
+ for part in walk(out):
+ if not part.is_multipart():
+ self._sign(part, key)
+ return out
- def sign(self):
- pass
+ def _decrypt(self, part, key):
+ message = PGPMessage.from_blob(part.get_payload())
+ decrypted = key.decrypt(message)
+ part.set_payload(decrypted.message)
def decrypt(self, key):
"""
@@ -126,10 +192,42 @@ class InlineWrapper:
:param key: The key to decrypt with.
:type key: pgpy.PGPKey
:return: The decrypted message.
- :rtype: PGPMessage
+ :rtype: mailman.email.message.Message
+ """
+ out = copy.deepcopy(self.msg)
+ for part in walk(out):
+ if not part.is_multipart() and self._is_encrypted(part):
+ self._decrypt(part, key)
+ return out
+
+ def _encrypt(self, part, *keys, **kwargs):
+ payload = str(part.get_payload())
+ pmsg = PGPMessage.new(payload)
+ for key in keys:
+ pmsg = key.encrypt(pmsg, **kwargs)
+ part.set_payload(str(pmsg))
+
+ def encrypt(self, *keys, cipher=SymmetricKeyAlgorithm.AES256):
+ """
+ Encrypt the message with key/s, using cipher.
+
+ :param keys: The key/s to encrypt with.
+ :type keys: pgpy.PGPKey
+ :param cipher: The symmetric cipher to use.
+ :type cipher: SymmetricKeyAlgorithm
+ :return: mailman.email.message.Message
"""
- message = PGPMessage.from_blob(self._as_string())
- return key.decrypt(message)
+ if len(keys) == 0:
+ raise ValueError('At least one key necessary.')
- def encrypt(self):
- pass
+ out = copy.deepcopy(self.msg)
+ for part in walk(out):
+ if not part.is_multipart():
+ if len(keys) == 1:
+ self._encrypt(part, *keys)
+ else:
+ session_key = cipher.gen_key()
+ for key in keys:
+ self._encrypt(part, key, session_key=session_key)
+ del session_key
+ return out