aboutsummaryrefslogtreecommitdiff
path: root/src/mailman_pgp/pgp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman_pgp/pgp')
-rw-r--r--src/mailman_pgp/pgp/inline.py5
-rw-r--r--src/mailman_pgp/pgp/mime.py68
-rw-r--r--src/mailman_pgp/pgp/tests/test_mime.py25
-rw-r--r--src/mailman_pgp/pgp/tests/test_wrapper.py35
-rw-r--r--src/mailman_pgp/pgp/wrapper.py18
5 files changed, 123 insertions, 28 deletions
diff --git a/src/mailman_pgp/pgp/inline.py b/src/mailman_pgp/pgp/inline.py
index 06ecee5..ae731cf 100644
--- a/src/mailman_pgp/pgp/inline.py
+++ b/src/mailman_pgp/pgp/inline.py
@@ -226,10 +226,11 @@ class InlineWrapper:
for part in walk(out):
if not part.is_multipart():
if len(keys) == 1:
- self._encrypt(part, *keys)
+ self._encrypt(part, *keys, cipher=cipher)
else:
session_key = cipher.gen_key()
for key in keys:
- self._encrypt(part, key, session_key=session_key)
+ self._encrypt(part, key, cipher=cipher,
+ session_key=session_key)
del session_key
return out
diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py
index 04b2db2..a373a7c 100644
--- a/src/mailman_pgp/pgp/mime.py
+++ b/src/mailman_pgp/pgp/mime.py
@@ -17,6 +17,7 @@
"""RFC1847 and RFC3156 compliant message wrapped."""
import copy
+from email import message_from_string
from email.encoders import encode_7or8bit
from email.iterators import walk
from email.mime.application import MIMEApplication
@@ -34,9 +35,13 @@ from mailman_pgp.pgp.utils import copy_headers
class MIMEWrapper:
"""PGP/MIME (RFC1847 + RFC3156) compliant wrapper."""
- _signed_subtype = 'application/pgp-signature'
- _encrypted_subtype = 'application/pgp-encrypted'
- _keys_subtype = 'application/pgp-keys'
+ _signature_subtype = 'pgp-signature'
+ _encryption_subtype = 'pgp-encrypted'
+ _keys_subtype = 'pgp-keys'
+
+ _signed_type = 'application/' + _signature_subtype
+ _encrypted_type = 'application/' + _encryption_subtype
+ _keys_type = 'application/' + _keys_subtype
def __init__(self, msg):
"""
@@ -66,9 +71,9 @@ class MIMEWrapper:
protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol'))
content_subtype = self.msg.get_content_subtype()
- return (second_type == MIMEWrapper._signed_subtype and
+ return (second_type == MIMEWrapper._signed_type and
content_subtype == 'signed' and
- protocol_param == MIMEWrapper._signed_subtype)
+ protocol_param == MIMEWrapper._signed_type)
def is_encrypted(self):
"""
@@ -86,14 +91,14 @@ class MIMEWrapper:
protocol_param = collapse_rfc2231_value(self.msg.get_param('protocol'))
return ('Version: 1' in first_part and
- first_type == MIMEWrapper._encrypted_subtype and
+ first_type == MIMEWrapper._encrypted_type and
second_type == 'application/octet-stream' and
content_subtype == 'encrypted' and
- protocol_param == MIMEWrapper._encrypted_subtype)
+ protocol_param == MIMEWrapper._encrypted_type)
def is_keys(self):
for part in walk(self.msg):
- if part.get_content_type() != MIMEWrapper._keys_subtype:
+ if part.get_content_type() != MIMEWrapper._keys_type:
return False
return True
@@ -105,7 +110,7 @@ class MIMEWrapper:
:rtype: bool
"""
for part in walk(self.msg):
- if part.get_content_type() == MIMEWrapper._keys_subtype:
+ if part.get_content_type() == MIMEWrapper._keys_type:
return True
return False
@@ -116,7 +121,7 @@ class MIMEWrapper:
:return: A collection of keys.
"""
for part in walk(self.msg):
- if part.get_content_type() == MIMEWrapper._keys_subtype:
+ if part.get_content_type() == MIMEWrapper._keys_type:
key, _ = PGPKey.from_blob(part.get_payload())
yield key
@@ -163,10 +168,10 @@ class MIMEWrapper:
micalg = self._micalg(signature.hash_algorithm)
out = MIMEMultipart('signed', micalg=micalg,
- protocol=MIMEWrapper._signed_subtype)
+ protocol=MIMEWrapper._signed_type)
second_part = MIMEApplication(_data=str(signature),
- _subtype='pgp-signature',
+ _subtype=MIMEWrapper._signature_subtype,
_encoder=encode_7or8bit)
out.attach(copy.deepcopy(self.msg))
out.attach(second_part)
@@ -180,19 +185,44 @@ class MIMEWrapper:
:param key: The key to decrypt with.
:type key: pgpy.PGPKey
:return: The decrypted message.
- :rtype: PGPMessage
+ :rtype: mailman.email.message.Message
"""
msg_text = self.msg.get_payload(1).get_payload()
- msg = PGPMessage.from_blob(msg_text)
- return key.decrypt(msg)
+ pmsg = PGPMessage.from_blob(msg_text)
+ decrypted = key.decrypt(pmsg)
+ out = message_from_string(decrypted.message)
+ copy_headers(self.msg, out)
+ return out
def encrypt(self, *keys, cipher=SymmetricKeyAlgorithm.AES256):
"""
+ Encrypt the message with key/s, using cipher.
- :param keys:
+ :param keys: The key/s to encrypt with.
:type keys: pgpy.PGPKey
- :param cipher:
+ :param cipher: The symmetric cipher to use.
:type cipher: SymmetricKeyAlgorithm
- :return:
+ :return: mailman.email.message.Message
"""
- pass
+ payload = self.msg.as_string()
+ pmsg = PGPMessage.new(payload)
+ if len(keys) == 1:
+ pmsg = keys[0].encrypt(pmsg, cipher=cipher)
+ else:
+ session_key = cipher.gen_key()
+ for key in keys:
+ pmsg = key.encrypt(pmsg, cipher=cipher,
+ session_key=session_key)
+ del session_key
+ out = MIMEMultipart('encrypted',
+ protocol=MIMEWrapper._encrypted_type)
+ first_part = MIMEApplication(_data='Version: 1',
+ _subtype=MIMEWrapper._encryption_subtype,
+ _encoder=encode_7or8bit)
+ second_part = MIMEApplication(_data=str(pmsg),
+ _subtype='octet-stream',
+ _encoder=encode_7or8bit)
+ out.attach(first_part)
+ out.attach(second_part)
+ copy_headers(self.msg, out)
+ return out
diff --git a/src/mailman_pgp/pgp/tests/test_mime.py b/src/mailman_pgp/pgp/tests/test_mime.py
index 2221c8f..16109eb 100644
--- a/src/mailman_pgp/pgp/tests/test_mime.py
+++ b/src/mailman_pgp/pgp/tests/test_mime.py
@@ -75,8 +75,29 @@ class TestEncryption(MIMEWrapperTestCase):
def test_is_encrypted(self):
pass
- def test_encrypt(self):
- pass
+ @parameterized.expand([
+ (load_message('data/clear.eml'),
+ load_key('data/rsa_1024.pub.asc')),
+ (load_message('data/clear.eml'),
+ (load_key('data/rsa_1024.pub.asc'),
+ load_key('data/ecc_p256.pub.asc')))
+ ])
+ def test_encrypt(self, message, keys, **kwargs):
+ if isinstance(keys, tuple):
+ super().encrypt(message, *keys, **kwargs)
+ else:
+ super().encrypt(message, keys, **kwargs)
+
+ @parameterized.expand([
+ (load_message('data/clear.eml'),
+ load_key('data/rsa_1024.pub.asc'),
+ load_key('data/rsa_1024.priv.asc')),
+ (load_message('data/clear.eml'),
+ load_key('data/ecc_p256.pub.asc'),
+ load_key('data/ecc_p256.priv.asc'))
+ ])
+ def test_encrypt_decrypt(self, message, pub, priv):
+ super().encrypt_decrypt(message, pub, priv)
def test_decrypt(self):
pass
diff --git a/src/mailman_pgp/pgp/tests/test_wrapper.py b/src/mailman_pgp/pgp/tests/test_wrapper.py
index 7cd5541..476b5bc 100644
--- a/src/mailman_pgp/pgp/tests/test_wrapper.py
+++ b/src/mailman_pgp/pgp/tests/test_wrapper.py
@@ -48,8 +48,14 @@ class TestSigning(PGPWrapperTestCase):
def test_is_signed(self, message, signed):
super().is_signed(message, signed)
- def test_sign(self):
- pass
+ @parameterized.expand([
+ (load_message('data/clear.eml'),
+ load_key('data/rsa_1024.priv.asc')),
+ (load_message('data/clear.eml'),
+ load_key('data/ecc_p256.priv.asc'))
+ ])
+ def test_sign(self, message, key):
+ super().sign(message, key)
@parameterized.expand([
(load_message('data/inline_cleartext_signed.eml'),
@@ -87,8 +93,29 @@ class TestEncryption(PGPWrapperTestCase):
def test_is_encrypted(self, message, encrypted):
super().is_encrypted(message, encrypted)
- def test_encrypt(self):
- pass
+ @parameterized.expand([
+ (load_message('data/clear.eml'),
+ load_key('data/rsa_1024.pub.asc')),
+ (load_message('data/clear.eml'),
+ (load_key('data/rsa_1024.pub.asc'),
+ load_key('data/ecc_p256.pub.asc')))
+ ])
+ def test_encrypt(self, message, keys, **kwargs):
+ if isinstance(keys, tuple):
+ super().encrypt(message, *keys, **kwargs)
+ else:
+ super().encrypt(message, keys, **kwargs)
+
+ @parameterized.expand([
+ (load_message('data/clear.eml'),
+ load_key('data/rsa_1024.pub.asc'),
+ load_key('data/rsa_1024.priv.asc')),
+ (load_message('data/clear.eml'),
+ load_key('data/ecc_p256.pub.asc'),
+ load_key('data/ecc_p256.priv.asc'))
+ ])
+ def test_encrypt_decrypt(self, message, pub, priv):
+ super().encrypt_decrypt(message, pub, priv)
@parameterized.expand([
(load_message('data/inline_encrypted.eml'),
diff --git a/src/mailman_pgp/pgp/wrapper.py b/src/mailman_pgp/pgp/wrapper.py
index 7049dc4..050dd8b 100644
--- a/src/mailman_pgp/pgp/wrapper.py
+++ b/src/mailman_pgp/pgp/wrapper.py
@@ -26,16 +26,26 @@ from mailman_pgp.pgp.mime import MIMEWrapper
class PGPWrapper():
"""A combined PGP/MIME + inline PGP wrapper."""
- def __init__(self, msg):
+ def __init__(self, msg, default=MIMEWrapper):
"""
Wrap the given message.
:param msg: The message to wrap.
:type msg: mailman.email.message.Message
+ :param default:
+ :type default: MIMEWrapper or InlineWrapper
"""
self.msg = msg
self.mime = MIMEWrapper(msg)
self.inline = InlineWrapper(msg)
+ if default is MIMEWrapper:
+ self.default = self.mime
+ elif default is InlineWrapper:
+ self.default = self.inline
+ else:
+ raise ValueError('Default wrapper must be one of ' +
+ MIMEWrapper.__name__ + ' ' +
+ InlineWrapper.__name__ + '.')
def is_mime_signed(self):
return self.mime.is_signed()
@@ -52,6 +62,9 @@ class PGPWrapper():
def inline_sign(self, key):
return self.inline.sign(key)
+ def sign(self, key):
+ return self.default.sign(key)
+
def verify(self, key):
"""
Verify the signature of this message with key.
@@ -81,6 +94,9 @@ class PGPWrapper():
def inline_encrypt(self, *keys, **kwargs):
return self.inline.encrypt(*keys, **kwargs)
+ def encrypt(self, *keys, **kwargs):
+ return self.default.encrypt(*keys, **kwargs)
+
def decrypt(self, key):
"""
Decrypt this message with key.