aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJ08nY2017-08-23 20:12:10 +0200
committerJ08nY2017-08-23 20:12:10 +0200
commita0997fb8e5893fed2c2275ff0cfbfa892b261601 (patch)
tree926f093cbe087a8c7f69705f159fe85ff8799caa
parent43cc9d3e2c76c82bd00ce46ee7de6d69d07f3bb3 (diff)
downloadmailman-pgp-feature/wrappers-modify.tar.gz
mailman-pgp-feature/wrappers-modify.tar.zst
mailman-pgp-feature/wrappers-modify.zip
-rw-r--r--src/mailman_pgp/archivers/local_maildir.py5
-rw-r--r--src/mailman_pgp/archivers/local_mbox.py5
-rw-r--r--src/mailman_pgp/archivers/tests/test_maildir.py2
-rw-r--r--src/mailman_pgp/archivers/tests/test_mbox.py2
-rw-r--r--src/mailman_pgp/commands/eml_key.py38
-rw-r--r--src/mailman_pgp/commands/tests/test_key.py154
-rw-r--r--src/mailman_pgp/mta/bulk.py11
-rw-r--r--src/mailman_pgp/mta/personalized.py9
-rw-r--r--src/mailman_pgp/mta/tests/test_bulk.py27
-rw-r--r--src/mailman_pgp/mta/tests/test_personalized.py20
-rw-r--r--src/mailman_pgp/pgp/base.py43
-rw-r--r--src/mailman_pgp/pgp/inline.py77
-rw-r--r--src/mailman_pgp/pgp/mime.py195
-rw-r--r--src/mailman_pgp/pgp/mime_multisig.py123
-rw-r--r--src/mailman_pgp/pgp/tests/test_wrapper.py4
-rw-r--r--src/mailman_pgp/pgp/wrapper.py78
-rw-r--r--src/mailman_pgp/rules/tests/test_signature.py12
-rw-r--r--src/mailman_pgp/runners/incoming.py2
-rw-r--r--src/mailman_pgp/runners/tests/test_incoming.py28
-rw-r--r--src/mailman_pgp/testing/pgp.py86
-rw-r--r--src/mailman_pgp/workflows/key_change.py6
-rw-r--r--src/mailman_pgp/workflows/key_confirm.py6
-rw-r--r--src/mailman_pgp/workflows/mod_approval.py9
23 files changed, 431 insertions, 511 deletions
diff --git a/src/mailman_pgp/archivers/local_maildir.py b/src/mailman_pgp/archivers/local_maildir.py
index 4b7e75d..a99966c 100644
--- a/src/mailman_pgp/archivers/local_maildir.py
+++ b/src/mailman_pgp/archivers/local_maildir.py
@@ -65,8 +65,7 @@ class LocalMaildirArchiver:
'{}-{}.lock'.format(mlist.fqdn_listname,
LocalMaildirArchiver.name)
)
- wrapped = MIMEWrapper(msg)
- encrypted = wrapped.encrypt(pgp_list.pubkey)
+ MIMEWrapper(msg).encrypt(pgp_list.pubkey)
with Lock(lock_file):
- maildir.add(encrypted)
+ maildir.add(msg)
return None
diff --git a/src/mailman_pgp/archivers/local_mbox.py b/src/mailman_pgp/archivers/local_mbox.py
index 99264c2..e8abdcf 100644
--- a/src/mailman_pgp/archivers/local_mbox.py
+++ b/src/mailman_pgp/archivers/local_mbox.py
@@ -65,8 +65,7 @@ class LocalMailboxArchiver:
'{}-{}.lock'.format(mlist.fqdn_listname,
LocalMailboxArchiver.name)
)
- wrapped = MIMEWrapper(msg)
- encrypted = wrapped.encrypt(pgp_list.pubkey)
+ MIMEWrapper(msg).encrypt(pgp_list.pubkey)
with Lock(lock_file):
- mailbox.add(encrypted)
+ mailbox.add(msg)
return None
diff --git a/src/mailman_pgp/archivers/tests/test_maildir.py b/src/mailman_pgp/archivers/tests/test_maildir.py
index 876b2c9..b12d383 100644
--- a/src/mailman_pgp/archivers/tests/test_maildir.py
+++ b/src/mailman_pgp/archivers/tests/test_maildir.py
@@ -82,5 +82,5 @@ but the water deserves to be swum.
message = messages[0]
wrapped = MIMEWrapper(message)
self.assertTrue(wrapped.is_encrypted())
- decrypted = wrapped.decrypt(self.list_key)
+ decrypted = wrapped.decrypt(self.list_key).msg
self.assertTrue(self.msg.as_string(), decrypted.as_string())
diff --git a/src/mailman_pgp/archivers/tests/test_mbox.py b/src/mailman_pgp/archivers/tests/test_mbox.py
index 8af2b77..f660344 100644
--- a/src/mailman_pgp/archivers/tests/test_mbox.py
+++ b/src/mailman_pgp/archivers/tests/test_mbox.py
@@ -82,5 +82,5 @@ but the water deserves to be swum.
message = messages[0]
wrapped = MIMEWrapper(message)
self.assertTrue(wrapped.is_encrypted())
- decrypted = wrapped.decrypt(self.list_key)
+ decrypted = wrapped.decrypt(self.list_key).msg
self.assertTrue(self.msg.as_string(), decrypted.as_string())
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index c42415d..6b3dbd6 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -65,10 +65,9 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results):
print('Missing token.', file=results)
return ContinueProcessing.no
- wrapped = PGPWrapper(msg)
+ wrapped = PGPWrapper(msg, True)
if wrapped.is_encrypted():
- decrypted = wrapped.try_decrypt(pgp_list.key)
- wrapped = PGPWrapper(decrypted)
+ wrapped.try_decrypt(pgp_list.key)
if not wrapped.has_keys():
print('No keys attached? Send a key.', file=results)
@@ -154,10 +153,9 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
print('No key set.', file=results)
return ContinueProcessing.no
- wrapped = PGPWrapper(msg)
+ wrapped = PGPWrapper(msg, True)
if wrapped.is_encrypted():
- decrypted = wrapped.try_decrypt(pgp_list.key)
- wrapped = PGPWrapper(decrypted)
+ wrapped.try_decrypt(pgp_list.key)
if not wrapped.is_signed():
print('Message not signed, ignoring.', file=results)
@@ -226,10 +224,9 @@ def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results):
print('Your key is currently not confirmed.', file=results)
return ContinueProcessing.no
- wrapped = PGPWrapper(msg)
+ wrapped = PGPWrapper(msg, True)
if wrapped.is_encrypted():
- decrypted = wrapped.try_decrypt(pgp_list.key)
- wrapped = PGPWrapper(decrypted)
+ wrapped.try_decrypt(pgp_list.key)
if not wrapped.has_keys():
print('No keys attached? Send a key.', file=results)
@@ -294,10 +291,9 @@ def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results):
print('Your key is currently not confirmed.', file=results)
return ContinueProcessing.no
- wrapped = PGPWrapper(msg)
+ wrapped = PGPWrapper(msg, True)
if wrapped.is_encrypted():
- decrypted = wrapped.try_decrypt(pgp_list.key)
- wrapped = PGPWrapper(decrypted)
+ wrapped.try_decrypt(pgp_list.key)
if not wrapped.has_revocs():
print('No key revocations attached? Send a key revocation.',
@@ -378,10 +374,9 @@ def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results):
print('Your key is currently not confirmed.', file=results)
return ContinueProcessing.no
- wrapped = PGPWrapper(msg)
+ wrapped = PGPWrapper(msg, True)
if wrapped.is_encrypted():
- decrypted = wrapped.try_decrypt(pgp_list.key)
- wrapped = PGPWrapper(decrypted)
+ wrapped.try_decrypt(pgp_list.key)
if not wrapped.has_keys():
print('No keys attached? Send a key.', file=results)
@@ -434,15 +429,14 @@ def _cmd_receive(pgp_list, mlist, msg, msgdata, arguments, results):
print('No email to send list public key.', file=results)
return ContinueProcessing.no
- msg = UserNotification(email, mlist.request_address,
+ out = UserNotification(email, mlist.request_address,
'{} public key'.format(mlist.fqdn_listname))
- msg.set_type('multipart/mixed')
- msg['MIME-Version'] = '1.0'
- msg.attach(MIMEText('Here is the public key you requested.'))
- wrapped = MIMEWrapper(msg)
- msg = wrapped.attach_keys(pgp_list.pubkey)
+ out.set_type('multipart/mixed')
+ out['MIME-Version'] = '1.0'
+ out.attach(MIMEText('Here is the public key you requested.'))
+ MIMEWrapper(out).attach_keys(pgp_list.pubkey)
- msg.send(mlist)
+ out.send(mlist)
print('Key sent.', file=results)
return ContinueProcessing.yes
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
index 8dd6fcc..ad45373 100644
--- a/src/mailman_pgp/commands/tests/test_key.py
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -20,7 +20,7 @@ import copy
import unittest
from mailman.app.lifecycle import create_list
-from mailman.email.message import Message
+from mailman.email.message import Message, MultipartDigestMessage
from mailman.interfaces.member import MemberRole
from mailman.interfaces.subscriptions import ISubscriptionManager
from mailman.interfaces.usermanager import IUserManager
@@ -56,11 +56,11 @@ def _create_plain(from_hdr, to_hdr, subject_hdr, payload):
def _create_mixed(from_hdr, to_hdr, subject_hdr):
- message = Message()
+ message = MultipartDigestMessage()
message['From'] = from_hdr
message['To'] = to_hdr
message['Subject'] = subject_hdr
- message.set_type('multipart/mixed')
+ message.set_payload([])
return message
@@ -141,8 +141,7 @@ class TestPreSubscription(unittest.TestCase):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'Re: key set {}'.format(token))
- wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey)
+ MIMEWrapper(set_message).attach_keys(self.bart_key.pubkey)
items = _run_message(set_message, 2)
@@ -179,11 +178,9 @@ class TestPreSubscription(unittest.TestCase):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'Re: key set {}'.format(token))
- wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey)
- wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.encrypt(self.pgp_list.pubkey,
- self.bart_key.pubkey)
+ MIMEWrapper(set_message).attach_keys(self.bart_key.pubkey).encrypt(
+ self.pgp_list.pubkey,
+ self.bart_key.pubkey)
items = _run_message(set_message, 2)
@@ -230,10 +227,8 @@ class TestPreSubscription(unittest.TestCase):
def test_set_multiple_keys(self):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'Re: key set token')
- wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey)
- wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_keys(self.anne_key.pubkey)
+ MIMEWrapper(set_message).attach_keys(self.bart_key.pubkey).attach_keys(
+ self.anne_key.pubkey)
items = _run_message(set_message, 1)
results_msg = items[0].msg
@@ -244,8 +239,7 @@ class TestPreSubscription(unittest.TestCase):
def test_set_private_key(self):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'Re: key set token')
- wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_keys(self.bart_key)
+ MIMEWrapper(set_message).attach_keys(self.bart_key)
items = _run_message(set_message, 1)
results_msg = items[0].msg
@@ -256,8 +250,7 @@ class TestPreSubscription(unittest.TestCase):
def test_set_no_encrypt_key(self):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'Re: key set token')
- wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_keys(self.unusable_key.pubkey)
+ MIMEWrapper(set_message).attach_keys(self.unusable_key.pubkey)
items = _run_message(set_message, 1)
results_msg = items[0].msg
@@ -268,8 +261,7 @@ class TestPreSubscription(unittest.TestCase):
def test_set_no_email(self):
message = _create_mixed('', 'test@example.com', 'key set token')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_keys(self.bart_key.pubkey)
+ MIMEWrapper(message).attach_keys(self.bart_key.pubkey)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -279,8 +271,7 @@ class TestPreSubscription(unittest.TestCase):
def test_set_no_address(self):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'key set token')
- wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey)
+ MIMEWrapper(set_message).attach_keys(self.bart_key.pubkey)
items = _run_message(set_message, 1)
results_msg = items[0].msg
@@ -295,8 +286,7 @@ class TestPreSubscription(unittest.TestCase):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'key set token')
- wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey)
+ MIMEWrapper(set_message).attach_keys(self.bart_key.pubkey)
items = _run_message(set_message, 1)
results_msg = items[0].msg
@@ -315,8 +305,7 @@ class TestPreSubscription(unittest.TestCase):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'key set token')
- wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey)
+ MIMEWrapper(set_message).attach_keys(self.bart_key.pubkey)
items = _run_message(set_message, 1)
results_msg = items[0].msg
@@ -339,8 +328,7 @@ class TestPreSubscription(unittest.TestCase):
CONFIRM_REQUEST.format(
self.bart_key.fingerprint,
token))
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.sign(self.bart_key)
+ MIMEWrapper(message).sign(self.bart_key)
_run_message(message)
@@ -364,10 +352,9 @@ class TestPreSubscription(unittest.TestCase):
CONFIRM_REQUEST.format(
self.bart_key.fingerprint,
token))
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.sign_encrypt(self.bart_key,
- self.pgp_list.pubkey,
- self.bart_key.pubkey)
+ MIMEWrapper(message).sign_encrypt(self.bart_key,
+ self.pgp_list.pubkey,
+ self.bart_key.pubkey)
_run_message(message)
@@ -415,8 +402,7 @@ class TestPreSubscription(unittest.TestCase):
CONFIRM_REQUEST.format(
self.bart_key.fingerprint,
'token'))
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.sign(self.bart_key)
+ MIMEWrapper(message).sign(self.bart_key)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -462,8 +448,7 @@ class TestPreSubscription(unittest.TestCase):
CONFIRM_REQUEST.format(
self.bart_key.fingerprint,
token))
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.sign(self.bart_key)
+ MIMEWrapper(message).sign(self.bart_key)
message.get_payload(0).set_payload(
'Something that was definitely not signed.')
@@ -488,8 +473,7 @@ class TestPreSubscription(unittest.TestCase):
CONFIRM_REQUEST.format(
self.bart_key.fingerprint,
'token'))
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.sign(self.bart_key)
+ MIMEWrapper(message).sign(self.bart_key)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -511,8 +495,7 @@ class TestPreSubscription(unittest.TestCase):
'Re: key confirm {}'.format(token),
'Some text, that definitely does not'
'contain the required/expected statement.')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.sign(self.bart_key)
+ MIMEWrapper(message).sign(self.bart_key)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -561,8 +544,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_keys(self.bart_new_key.pubkey)
+ MIMEWrapper(message).attach_keys(self.bart_new_key.pubkey)
items = _run_message(message, 2)
if (items[0].msg['Subject'] ==
@@ -577,8 +559,8 @@ class TestAfterSubscription(unittest.TestCase):
confirm_wrapped = PGPWrapper(confirm_request)
self.assertTrue(confirm_wrapped.is_encrypted())
- decrypted = confirm_wrapped.decrypt(self.bart_new_key)
- self.assertIn('key confirm', decrypted['subject'])
+ decrypted = confirm_wrapped.decrypt(self.bart_new_key).msg
+ self.assertIn('key confirm', str(decrypted['subject']))
def test_change_encrypted(self):
bart = getUtility(IUserManager).create_address('bart@example.com',
@@ -592,10 +574,8 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_keys(self.bart_new_key.pubkey)
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.encrypt(self.pgp_list.pubkey)
+ MIMEWrapper(message).attach_keys(self.bart_new_key.pubkey).encrypt(
+ self.pgp_list.pubkey)
items = _run_message(message, 2)
if (items[0].msg['Subject'] ==
@@ -610,8 +590,8 @@ class TestAfterSubscription(unittest.TestCase):
confirm_wrapped = PGPWrapper(confirm_request)
self.assertTrue(confirm_wrapped.is_encrypted())
- decrypted = confirm_wrapped.decrypt(self.bart_new_key)
- self.assertIn('key confirm', decrypted['subject'])
+ decrypted = confirm_wrapped.decrypt(self.bart_new_key).msg
+ self.assertIn('key confirm', str(decrypted['subject']))
def test_change_confirm(self):
bart = getUtility(IUserManager).create_address('bart@example.com',
@@ -625,8 +605,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_keys(self.bart_new_key.pubkey)
+ MIMEWrapper(message).attach_keys(self.bart_new_key.pubkey)
items = _run_message(message, 2)
if (items[0].msg['Subject'] ==
@@ -635,20 +614,19 @@ class TestAfterSubscription(unittest.TestCase):
else:
confirm_request = items[0].msg
request_wrapped = PGPWrapper(confirm_request)
- decrypted = request_wrapped.decrypt(self.bart_new_key)
+ decrypted = request_wrapped.decrypt(self.bart_new_key).msg
subj = decrypted['subject']
- token = subj.split(' ')[-1]
+ token = str(subj).split(' ')[-1]
confirm_message = _create_plain('bart@example.com', 'test@example.com',
decrypted['subject'],
CHANGE_CONFIRM_REQUEST.format(
self.bart_new_key.fingerprint,
token))
- wrapped_confirm = MIMEWrapper(confirm_message)
- confirm = wrapped_confirm.sign(self.bart_key)
+ MIMEWrapper(confirm_message).sign(self.bart_key)
- _run_message(confirm)
+ _run_message(confirm_message)
pgp_address = PGPAddress.for_address(bart)
self.assertEqual(pgp_address.key_fingerprint,
@@ -666,8 +644,7 @@ class TestAfterSubscription(unittest.TestCase):
def test_change_no_email(self):
message = _create_mixed('', 'test@example.com', 'key change')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_keys(self.bart_key.pubkey)
+ MIMEWrapper(message).attach_keys(self.bart_key.pubkey)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -677,8 +654,7 @@ class TestAfterSubscription(unittest.TestCase):
def test_change_no_pgp_address(self):
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_keys(self.bart_key.pubkey)
+ MIMEWrapper(message).attach_keys(self.bart_key.pubkey)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -749,10 +725,8 @@ class TestAfterSubscription(unittest.TestCase):
set_message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
- wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_keys(self.bart_key.pubkey)
- wrapped_set_message = MIMEWrapper(set_message)
- set_message = wrapped_set_message.attach_keys(self.bart_new_key.pubkey)
+ MIMEWrapper(set_message).attach_keys(self.bart_key.pubkey).attach_keys(
+ self.bart_new_key.pubkey)
items = _run_message(set_message, 1)
results_msg = items[0].msg
@@ -771,8 +745,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_keys(self.bart_key)
+ MIMEWrapper(message).attach_keys(self.bart_key)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -791,8 +764,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key change')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_keys(self.unusable_key.pubkey)
+ MIMEWrapper(message).attach_keys(self.unusable_key.pubkey)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -814,8 +786,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key revoke')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_revocs(revoc)
+ MIMEWrapper(message).attach_revocs(revoc)
items = _run_message(message, 2)
if (items[0].msg['Subject'] ==
@@ -856,8 +827,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key revoke')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_revocs(revoc)
+ MIMEWrapper(message).attach_revocs(revoc)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -881,10 +851,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key revoke')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_revocs(revoc)
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.encrypt(self.pgp_list.pubkey)
+ MIMEWrapper(message).attach_revocs(revoc).encrypt(self.pgp_list.pubkey)
items = _run_message(message, 2)
if (items[0].msg['Subject'] ==
@@ -991,12 +958,12 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key sign')
- wrapped_message = MIMEWrapper(message)
+
new_key = copy.copy(self.pgp_list.pubkey)
uid = next(iter(new_key.userids))
sig = self.bart_key.certify(uid)
uid |= sig
- message = wrapped_message.attach_keys(new_key)
+ MIMEWrapper(message).attach_keys(new_key)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -1020,14 +987,12 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key sign')
- wrapped_message = MIMEWrapper(message)
+
new_key = copy.copy(self.pgp_list.pubkey)
uid = next(iter(new_key.userids))
sig = self.bart_key.certify(uid)
uid |= sig
- message = wrapped_message.attach_keys(new_key)
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.encrypt(self.pgp_list.pubkey)
+ MIMEWrapper(message).attach_keys(new_key).encrypt(self.pgp_list.pubkey)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -1125,10 +1090,8 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key sign')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_keys(self.bart_key.pubkey)
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_keys(self.bart_new_key.pubkey)
+ MIMEWrapper(message).attach_keys(self.bart_key.pubkey).attach_keys(
+ self.bart_new_key.pubkey)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -1149,8 +1112,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key sign')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_keys(self.pgp_list.pubkey)
+ MIMEWrapper(message).attach_keys(self.pgp_list.pubkey)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -1174,8 +1136,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key sign')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_keys(self.bart_key.pubkey)
+ MIMEWrapper(message).attach_keys(self.bart_key.pubkey)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -1199,11 +1160,11 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key sign')
- wrapped_message = MIMEWrapper(message)
new_key = copy.copy(self.pgp_list.pubkey)
for uid in new_key.userids:
new_key.del_uid(uid.email)
- message = wrapped_message.attach_keys(new_key)
+
+ MIMEWrapper(message).attach_keys(new_key)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -1227,8 +1188,7 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key sign')
- wrapped_message = MIMEWrapper(message)
- message = wrapped_message.attach_keys(self.pgp_list.pubkey)
+ MIMEWrapper(message).attach_keys(self.pgp_list.pubkey)
items = _run_message(message, 1)
results_msg = items[0].msg
@@ -1252,12 +1212,12 @@ class TestAfterSubscription(unittest.TestCase):
message = _create_mixed('bart@example.com', 'test@example.com',
'key sign')
- wrapped_message = MIMEWrapper(message)
+
new_key = copy.copy(self.pgp_list.pubkey)
uid = next(iter(new_key.userids))
sig = self.bart_new_key.certify(uid)
uid |= sig
- message = wrapped_message.attach_keys(new_key)
+ MIMEWrapper(message).attach_keys(new_key)
items = _run_message(message, 1)
results_msg = items[0].msg
diff --git a/src/mailman_pgp/mta/bulk.py b/src/mailman_pgp/mta/bulk.py
index a01b5a7..1c4f213 100644
--- a/src/mailman_pgp/mta/bulk.py
+++ b/src/mailman_pgp/mta/bulk.py
@@ -24,7 +24,6 @@ from public import public
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.mime import MIMEWrapper
-from mailman_pgp.utils.email import overwrite_message
class CallbackBulkDelivery(BulkDelivery):
@@ -92,16 +91,14 @@ class PGPBulkMixin:
wrapped = MIMEWrapper(msg)
if pgp_list.sign_outgoing:
if pgp_list.encrypt_outgoing:
- out = wrapped.sign_encrypt(pgp_list.key, pgp_list.pubkey,
- *keys, throw_keyid=True)
+ wrapped.sign_encrypt(pgp_list.key, pgp_list.pubkey,
+ *keys, throw_keyid=True)
else:
- out = wrapped.sign(pgp_list.key)
+ wrapped.sign(pgp_list.key)
else:
# Definitely encrypt here, the case where we don't encrypt or sign
# is handled above at the start of the func.
- out = wrapped.encrypt(pgp_list.pubkey, *keys, throw_keyid=True)
-
- overwrite_message(out, msg)
+ wrapped.encrypt(pgp_list.pubkey, *keys, throw_keyid=True)
@public
diff --git a/src/mailman_pgp/mta/personalized.py b/src/mailman_pgp/mta/personalized.py
index 9fcc282..5c12a0c 100644
--- a/src/mailman_pgp/mta/personalized.py
+++ b/src/mailman_pgp/mta/personalized.py
@@ -25,7 +25,6 @@ from public import public
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.mime import MIMEWrapper
-from mailman_pgp.utils.email import overwrite_message
class PGPIndividualMixin:
@@ -63,15 +62,13 @@ class PGPIndividualMixin:
if pgp_list.sign_outgoing:
if pgp_list.encrypt_outgoing:
- out = wrapped.sign_encrypt(pgp_list.key, key, pgp_list.pubkey)
+ wrapped.sign_encrypt(pgp_list.key, key, pgp_list.pubkey)
else:
- out = wrapped.sign(pgp_list.key)
+ wrapped.sign(pgp_list.key)
else:
# Definitely encrypt here, the case where we don't encrypt or sign
# is handled above at the start of the func.
- out = wrapped.encrypt(key, pgp_list.pubkey)
-
- overwrite_message(out, msg)
+ wrapped.encrypt(key, pgp_list.pubkey)
@public
diff --git a/src/mailman_pgp/mta/tests/test_bulk.py b/src/mailman_pgp/mta/tests/test_bulk.py
index 02c5bf2..e9f77d0 100644
--- a/src/mailman_pgp/mta/tests/test_bulk.py
+++ b/src/mailman_pgp/mta/tests/test_bulk.py
@@ -99,20 +99,17 @@ Subject: test
out_wrapped = PGPWrapper(out_msg)
self.assertTrue(out_wrapped.is_encrypted())
- decrypted = out_wrapped.decrypt(self.list_key)
- wrapped = PGPWrapper(decrypted)
- self.assertTrue(wrapped.is_signed())
- self.assertTrue(verifies(wrapped.verify(self.list_key.pubkey)))
+ decrypted = out_wrapped.copy().decrypt(self.list_key)
+ self.assertTrue(decrypted.is_signed())
+ self.assertTrue(verifies(decrypted.verify(self.list_key.pubkey)))
- decrypted = out_wrapped.decrypt(self.anne_key)
- wrapped = PGPWrapper(decrypted)
- self.assertTrue(wrapped.is_signed())
- self.assertTrue(verifies(wrapped.verify(self.list_key.pubkey)))
+ decrypted = out_wrapped.copy().decrypt(self.anne_key)
+ self.assertTrue(decrypted.is_signed())
+ self.assertTrue(verifies(decrypted.verify(self.list_key.pubkey)))
- decrypted = out_wrapped.decrypt(self.bart_key)
- wrapped = PGPWrapper(decrypted)
- self.assertTrue(wrapped.is_signed())
- self.assertTrue(verifies(wrapped.verify(self.list_key.pubkey)))
+ decrypted = out_wrapped.copy().decrypt(self.bart_key)
+ self.assertTrue(decrypted.is_signed())
+ self.assertTrue(verifies(decrypted.verify(self.list_key.pubkey)))
def test_encrypt(self):
with transaction():
@@ -129,9 +126,9 @@ Subject: test
out_msg = agent.deliveries[0][1]
wrapped = PGPWrapper(out_msg)
self.assertTrue(wrapped.is_encrypted())
- wrapped.decrypt(self.list_key)
- wrapped.decrypt(self.anne_key)
- wrapped.decrypt(self.bart_key)
+ wrapped.copy().decrypt(self.list_key)
+ wrapped.copy().decrypt(self.anne_key)
+ wrapped.copy().decrypt(self.bart_key)
def test_sign(self):
with transaction():
diff --git a/src/mailman_pgp/mta/tests/test_personalized.py b/src/mailman_pgp/mta/tests/test_personalized.py
index 7f4caa8..953ef55 100644
--- a/src/mailman_pgp/mta/tests/test_personalized.py
+++ b/src/mailman_pgp/mta/tests/test_personalized.py
@@ -89,13 +89,11 @@ Subject: test
out_wrapped = PGPWrapper(out_msg)
self.assertTrue(out_wrapped.is_encrypted())
- decrypted = out_wrapped.decrypt(self.list_key)
- wrapped = PGPWrapper(decrypted)
- self.assertTrue(wrapped.is_signed())
+ decrypted = out_wrapped.copy().decrypt(self.list_key)
+ self.assertTrue(decrypted.is_signed())
- decrypted = out_wrapped.decrypt(self.anne_key)
- wrapped = PGPWrapper(decrypted)
- self.assertTrue(wrapped.is_signed())
+ decrypted = out_wrapped.copy().decrypt(self.anne_key)
+ self.assertTrue(decrypted.is_signed())
def test_encrypt(self):
with transaction():
@@ -113,13 +111,11 @@ Subject: test
out_wrapped = PGPWrapper(out_msg)
self.assertTrue(out_wrapped.is_encrypted())
- decrypted = out_wrapped.decrypt(self.list_key)
- wrapped = PGPWrapper(decrypted)
- self.assertFalse(wrapped.is_signed())
+ decrypted = out_wrapped.copy().decrypt(self.list_key)
+ self.assertFalse(decrypted.is_signed())
- decrypted = out_wrapped.decrypt(self.anne_key)
- wrapped = PGPWrapper(decrypted)
- self.assertFalse(wrapped.is_signed())
+ decrypted = out_wrapped.copy().decrypt(self.anne_key)
+ self.assertFalse(decrypted.is_signed())
def test_sign(self):
with transaction():
diff --git a/src/mailman_pgp/pgp/base.py b/src/mailman_pgp/pgp/base.py
new file mode 100644
index 0000000..497e9b5
--- /dev/null
+++ b/src/mailman_pgp/pgp/base.py
@@ -0,0 +1,43 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+""""""
+from copy import deepcopy
+
+from public import public
+
+
+@public
+class BaseWrapper:
+ """"""
+
+ def __init__(self, msg, copy=False):
+ """
+ Wrap the given message.
+
+ :param msg: The message to wrap.
+ :type msg: mailman.email.message.Message
+ """
+ if copy:
+ self.msg = deepcopy(msg)
+ else:
+ self.msg = msg
+
+ def copy(self):
+ return self.__class__(self.msg, True)
+
+ def __copy__(self):
+ return self.copy()
diff --git a/src/mailman_pgp/pgp/inline.py b/src/mailman_pgp/pgp/inline.py
index fa8f878..410078c 100644
--- a/src/mailman_pgp/pgp/inline.py
+++ b/src/mailman_pgp/pgp/inline.py
@@ -24,23 +24,14 @@ from pgpy import PGPMessage
from pgpy.constants import SymmetricKeyAlgorithm
from public import public
-from mailman_pgp.utils.email import make_multipart
+from mailman_pgp.pgp.base import BaseWrapper
from mailman_pgp.utils.pgp import key_from_blob, revoc_from_blob
@public
-class InlineWrapper:
+class InlineWrapper(BaseWrapper):
"""Inline PGP wrapper."""
- def __init__(self, msg):
- """
- Wrap the given message.
-
- :param msg: The message to wrap.
- :type msg: mailman.email.message.Message
- """
- self.msg = msg
-
def get_payload(self):
for part in walk(self.msg):
if not part.is_multipart():
@@ -219,14 +210,21 @@ class InlineWrapper:
:param key_revocations: A key revocation signature to attach.
:type key_revocations: pgpy.PGPSignature
- :return: The message with the signature attached.
- :rtype: mailman.email.message.Message
+ :return:
+ :rtype: InlineWrapper
"""
- out = make_multipart(self.msg)
+ if self.msg.get_content_type() != 'multipart/mixed':
+ # wrap in multipart/mixed
+ payload = copy.deepcopy(self.msg)
+ self.msg.set_payload([])
+ self.msg.set_type('multipart/mixed')
+ self.msg['MIME-Version'] = '1.0'
+ self.msg.attach(payload)
+
for key_revocation in key_revocations:
revoc_part = MIMEText(str(key_revocation))
- out.attach(revoc_part)
- return out
+ self.msg.attach(revoc_part)
+ return self
def verify(self, key):
"""
@@ -250,11 +248,10 @@ class InlineWrapper:
:param key: The key to sign with.
:type key: pgpy.PGPKey
- :return: The signed message.
- :rtype: mailman.email.message.Message
+ :return:
+ :rtype: InlineWrapper
"""
- out = copy.deepcopy(self.msg)
- for part in walk(out):
+ for part in walk(self.msg):
if not part.is_multipart():
if self._is_signed(part):
pmsg = PGPMessage.from_blob(part.get_payload())
@@ -263,16 +260,18 @@ class InlineWrapper:
pmsg = PGPMessage.new(payload, cleartext=True)
smsg = self._sign(pmsg, key, **kwargs)
part.set_payload(str(smsg))
- return out
+ return self
def _decrypt(self, part, key):
message = PGPMessage.from_blob(part.get_payload())
- # TODO: exception safe this.
decrypted = key.decrypt(message)
if decrypted.is_signed:
part.set_payload(str(decrypted))
else:
- part.set_payload(decrypted.message)
+ dmsg = decrypted.message
+ if isinstance(dmsg, bytearray):
+ dmsg = dmsg.decode(decrypted.charset or 'utf-8')
+ part.set_payload(dmsg)
def decrypt(self, key):
"""
@@ -280,14 +279,13 @@ class InlineWrapper:
:param key: The key to decrypt with.
:type key: pgpy.PGPKey
- :return: The decrypted message.
- :rtype: mailman.email.message.Message
+ :return:
+ :rtype: InlineWrapper
"""
- out = copy.deepcopy(self.msg)
- for part in walk(out):
+ for part in walk(self.msg):
if not part.is_multipart() and self._is_encrypted(part):
self._decrypt(part, key)
- return out
+ return self
def _encrypt(self, pmsg, *keys, cipher, **kwargs):
emsg = copy.copy(pmsg)
@@ -311,19 +309,19 @@ class InlineWrapper:
:type keys: pgpy.PGPKey
:param cipher: The symmetric cipher to use.
:type cipher: SymmetricKeyAlgorithm
- :return: mailman.email.message.Message
+ :return:
+ :rtype: InlineWrapper
"""
if len(keys) == 0:
raise ValueError('At least one key necessary.')
- out = copy.deepcopy(self.msg)
- for part in walk(out):
+ for part in walk(self.msg):
if not part.is_multipart():
payload = str(part.get_payload())
pmsg = PGPMessage.new(payload)
emsg = self._encrypt(pmsg, *keys, cipher=cipher, **kwargs)
part.set_payload(str(emsg))
- return out
+ return self
def sign_encrypt(self, key, *keys, hash=None,
cipher=SymmetricKeyAlgorithm.AES256,
@@ -339,14 +337,13 @@ class InlineWrapper:
:type hash: pgpy.constants.HashAlgorithm
:param cipher:
:type cipher: pgpy.constants.SymmetricKeyAlgorithm
- :return: The signed + encrypted message.
- :rtype: mailman.email.message.Message
+ :return:
+ :rtype: InlineWrapper
"""
if len(keys) == 0:
raise ValueError('At least one key necessary.')
- out = copy.deepcopy(self.msg)
- for part in walk(out):
+ for part in walk(self.msg):
if not part.is_multipart():
if self._is_signed(part):
pmsg = PGPMessage.from_blob(part.get_payload())
@@ -356,10 +353,4 @@ class InlineWrapper:
smsg = self._sign(pmsg, key, hash=hash)
emsg = self._encrypt(smsg, *keys, cipher=cipher, **kwargs)
part.set_payload(str(emsg))
- return out
-
- def sign_then_encrypt(self, key, *keys, hash=None,
- cipher=SymmetricKeyAlgorithm.AES256,
- **kwargs):
- return self.sign_encrypt(key, *keys, hash=hash, cipher=cipher,
- **kwargs)
+ return self
diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py
index eeeed95..9ea6384 100644
--- a/src/mailman_pgp/pgp/mime.py
+++ b/src/mailman_pgp/pgp/mime.py
@@ -23,17 +23,18 @@ from email.iterators import walk
from email.mime.application import MIMEApplication
from email.utils import collapse_rfc2231_value
-from mailman.email.message import Message, MultipartDigestMessage
+from mailman.email.message import Message
from pgpy import PGPMessage, PGPSignature
from pgpy.constants import HashAlgorithm, SymmetricKeyAlgorithm
from public import public
-from mailman_pgp.utils.email import copy_headers, make_multipart
+from mailman_pgp.pgp.base import BaseWrapper
+from mailman_pgp.utils.email import copy_headers
from mailman_pgp.utils.pgp import key_from_blob, revoc_from_blob
@public
-class MIMEWrapper:
+class MIMEWrapper(BaseWrapper):
"""PGP/MIME (RFC1847 + RFC3156) compliant wrapper."""
_signature_subtype = 'pgp-signature'
@@ -44,20 +45,14 @@ class MIMEWrapper:
_encrypted_type = 'application/' + _encryption_subtype
_keys_type = 'application/' + _keys_subtype
+ _signed_multipart = 'multipart/signed'
+ _encrypted_multipart = 'multipart/encrypted'
+
_signature_preamble = \
'This is an OpenPGP/MIME signed message (RFC 4880 and 3156)'
_encryption_preamble = \
'This is an OpenPGP/MIME encrypted message (RFC 4880 and 3156)'
- def __init__(self, msg):
- """
- Wrap the given message.
-
- :param msg: The message to wrap.
- :type msg: mailman.email.message.Message
- """
- self.msg = msg
-
def get_payload(self):
yield self.msg.as_string()
@@ -186,28 +181,40 @@ class MIMEWrapper:
continue
yield key
+ def _attach_key_part(self, obj, name, description):
+ key_part = MIMEApplication(_data=str(obj),
+ _subtype=MIMEWrapper._keys_subtype,
+ _encoder=encode_7or8bit,
+ name=name)
+ key_part.add_header('Content-Description', description)
+ key_part.add_header('Content-Disposition', 'attachment',
+ filename=name)
+ self.msg.attach(key_part)
+
def attach_keys(self, *keys):
"""
Attach a key to this message, as per RFC3156 section 7.
- :param keys: A key to attach.
+ :param keys: Keys to attach.
:type keys: pgpy.PGPKey
- :return: The message with the key attached.
- :rtype: mailman.email.message.Message
+ :return:
+ :rtype: MIMEWrapper
"""
- out = make_multipart(self.msg)
+ if len(keys) == 0:
+ return self
+
+ if self.msg.get_content_type() != 'multipart/mixed':
+ # wrap in multipart/mixed
+ payload = copy.deepcopy(self.msg)
+ self.msg.set_payload([])
+ self.msg.set_type('multipart/mixed')
+ self.msg['MIME-Version'] = '1.0'
+ self.msg.attach(payload)
+
for key in keys:
filename = '0x' + key.fingerprint.keyid + '.asc'
- key_part = MIMEApplication(_data=str(key),
- _subtype=MIMEWrapper._keys_subtype,
- _encoder=encode_7or8bit,
- name=filename)
- key_part.add_header('Content-Description',
- 'OpenPGP key')
- key_part.add_header('Content-Disposition', 'attachment',
- filename=filename)
- out.attach(key_part)
- return out
+ self._attach_key_part(key, filename, 'OpenPGP key')
+ return self
def _is_revoc(self, part):
if part.get_content_type() != MIMEWrapper._keys_type:
@@ -246,22 +253,25 @@ class MIMEWrapper:
:param key_revocations: A key revocation signature to attach.
:type key_revocations: pgpy.PGPSignature
- :return: The message with the signature attached.
- :rtype: mailman.email.message.Message
+ :return:
+ :rtype: MIMEWrapper
"""
- out = make_multipart(self.msg)
+ if len(key_revocations) == 0:
+ return self
+
+ if self.msg.get_content_type() != 'multipart/mixed':
+ # wrap in multipart/mixed
+ payload = copy.deepcopy(self.msg)
+ self.msg.set_payload([])
+ self.msg.set_type('multipart/mixed')
+ self.msg['MIME-Version'] = '1.0'
+ self.msg.attach(payload)
+
for key_revocation in key_revocations:
filename = '0x' + key_revocation.signer + '.asc'
- revoc_part = MIMEApplication(_data=str(key_revocation),
- _subtype=MIMEWrapper._keys_subtype,
- _encoder=encode_7or8bit,
- name=filename)
- revoc_part.add_header('Content-Description',
- 'OpenPGP key')
- revoc_part.add_header('Content-Disposition', 'attachment',
- filename=filename)
- out.attach(revoc_part)
- return out
+ self._attach_key_part(key_revocation, filename,
+ 'OpenPGP key revocation')
+ return self
def verify(self, key):
"""
@@ -298,12 +308,13 @@ class MIMEWrapper:
:param msg:
:param signature:
- :return:
"""
- micalg = self._micalg(signature.hash_algorithm)
- out = MultipartDigestMessage('signed', micalg=micalg,
- protocol=MIMEWrapper._signed_type)
- out.preamble = MIMEWrapper._signature_preamble
+ self.msg.set_payload([])
+ self.msg.attach(msg)
+ self.msg.set_type(MIMEWrapper._signed_multipart)
+ self.msg.set_param('micalg', self._micalg(signature.hash_algorithm))
+ self.msg.set_param('protocol', MIMEWrapper._signed_type)
+ self.msg.preamble = MIMEWrapper._signature_preamble
second_part = MIMEApplication(_data=str(signature),
_subtype=MIMEWrapper._signature_subtype,
@@ -313,11 +324,7 @@ class MIMEWrapper:
'OpenPGP digital signature')
second_part.add_header('Content-Disposition', 'attachment',
filename='signature.asc')
-
- out.attach(copy.deepcopy(msg))
- out.attach(second_part)
- copy_headers(msg, out)
- return out
+ self.msg.attach(second_part)
def sign(self, key, **kwargs):
"""
@@ -325,12 +332,14 @@ class MIMEWrapper:
:param key: The key to sign with.
:type key: pgpy.PGPKey
- :return: The signed message.
- :rtype: mailman.email.message.Message
+ :rtype: MIMEWrapper
"""
payload = next(iter(self.get_payload()))
signature = key.sign(payload, **kwargs)
- return self._wrap_signed(self.msg, signature)
+ original_msg = copy.deepcopy(self.msg)
+ self._wrap_signed(original_msg, signature)
+
+ return self
def decrypt(self, key):
"""
@@ -342,7 +351,6 @@ class MIMEWrapper:
:rtype: mailman.email.message.Message
"""
pmsg = next(iter(self.get_encrypted()))
- # TODO: exception safe this.
decrypted = key.decrypt(pmsg)
dmsg = decrypted.message
@@ -351,9 +359,17 @@ class MIMEWrapper:
out = message_from_string(dmsg, _class=Message)
if decrypted.is_signed:
- out = self._wrap_signed(out, decrypted.signatures.pop())
- copy_headers(self.msg, out)
- return out
+ # rewrap, self.msg should be multipart/signed,
+ # headers from out should overwrite those from self.msg
+ # self.msg payload should be [out, sig]
+ signature = next(iter(decrypted.signatures))
+ self._wrap_signed(out, signature)
+ else:
+ # self.msg payload should be out.get_payload
+ # headers from out should overwrite those from self.msg
+ self.msg.set_payload(out.get_payload())
+ copy_headers(out, self.msg, True)
+ return self
def _encrypt(self, pmsg, *keys, cipher, **kwargs):
emsg = copy.copy(pmsg)
@@ -369,16 +385,16 @@ class MIMEWrapper:
return emsg
def _wrap_encrypted(self, payload):
- out = MultipartDigestMessage('encrypted',
- protocol=MIMEWrapper._encrypted_type)
- out.preamble = MIMEWrapper._encryption_preamble
-
+ self.msg.set_payload([])
+ self.msg.set_type(MIMEWrapper._encrypted_multipart)
+ self.msg.set_param('protocol', MIMEWrapper._encrypted_type)
+ self.msg.preamble = MIMEWrapper._encryption_preamble
first_part = MIMEApplication(_data='Version: 1',
_subtype=MIMEWrapper._encryption_subtype,
_encoder=encode_7or8bit)
first_part.add_header('Content-Description',
'PGP/MIME version identification')
-
+ self.msg.attach(first_part)
second_part = MIMEApplication(_data=str(payload),
_subtype='octet-stream',
_encoder=encode_7or8bit,
@@ -387,9 +403,7 @@ class MIMEWrapper:
'OpenPGP encrypted message')
second_part.add_header('Content-Disposition', 'inline',
filename='encrypted.asc')
- out.attach(first_part)
- out.attach(second_part)
- return out
+ self.msg.attach(second_part)
def encrypt(self, *keys, cipher=SymmetricKeyAlgorithm.AES256,
**kwargs):
@@ -400,21 +414,25 @@ class MIMEWrapper:
:type keys: pgpy.PGPKey
:param cipher: The symmetric cipher to use.
:type cipher: pgpy.constants.SymmetricKeyAlgorithm
- :return: The encrypted message.
- :rtype: mailman.email.message.Message
+ :return:
+ :rtype: MIMEWrapper
"""
if len(keys) == 0:
raise ValueError('At least one key necessary.')
if self.is_signed():
+ # self.msg payload should be [ version_1, encrypted]
+ # headers should remain the same, except Content-Type
+ # signature should be combined into the PGP blob
pmsg = PGPMessage.new(next(iter(self.get_signed())))
pmsg |= next(iter(self.get_signature()))
else:
+ # self.msg payload should be [ version_1, encrypted]
+ # headers should remain the same, except Content-Type
pmsg = PGPMessage.new(next(iter(self.get_payload())))
pmsg = self._encrypt(pmsg, *keys, cipher=cipher, **kwargs)
- out = self._wrap_encrypted(pmsg)
- copy_headers(self.msg, out)
- return out
+ self._wrap_encrypted(pmsg)
+ return self
def sign_encrypt(self, key, *keys, hash=None,
cipher=SymmetricKeyAlgorithm.AES256,
@@ -432,8 +450,8 @@ class MIMEWrapper:
:type hash: pgpy.constants.HashAlgorithm
:param cipher:
:type cipher: pgpy.constants.SymmetricKeyAlgorithm
- :return: The signed + encrypted message.
- :rtype: mailman.email.message.Message
+ :return:
+ :rtype: MIMEWrapper
"""
if len(keys) == 0:
raise ValueError('At least one key necessary.')
@@ -442,36 +460,5 @@ class MIMEWrapper:
pmsg = PGPMessage.new(payload)
pmsg |= key.sign(pmsg, hash=hash)
pmsg = self._encrypt(pmsg, *keys, cipher=cipher, **kwargs)
- out = self._wrap_encrypted(pmsg)
- copy_headers(self.msg, out)
- return out
-
- def sign_then_encrypt(self, key, *keys, hash=None,
- cipher=SymmetricKeyAlgorithm.AES256,
- **kwargs):
- """
- Sign then encrypt the message.
-
- This is as per RFC 3156 section 6.1 - RFC 1847 Encapsulation.
-
- :param key: The key to sign with.
- :type key: pgpy.PGPKey
- :param keys: The key/s to encrypt with.
- :type keys: pgpy.PGPKey
- :param hash:
- :type hash: pgpy.constants.HashAlgorithm
- :param cipher:
- :type cipher: pgpy.constants.SymmetricKeyAlgorithm
- :return: The signed + encrypted message.
- :rtype: mailman.email.message.Message
- """
- if len(keys) == 0:
- raise ValueError('At least one key necessary.')
-
- out = self.sign(key, hash=hash)
- out_wrapped = MIMEWrapper(out)
- pmsg = PGPMessage.new(next(out_wrapped.get_payload()))
- pmsg = self._encrypt(pmsg, *keys, cipher=cipher, **kwargs)
- out = self._wrap_encrypted(pmsg)
- copy_headers(self.msg, out)
- return out
+ self._wrap_encrypted(pmsg)
+ return self
diff --git a/src/mailman_pgp/pgp/mime_multisig.py b/src/mailman_pgp/pgp/mime_multisig.py
index d7ad00a..1dbb73c 100644
--- a/src/mailman_pgp/pgp/mime_multisig.py
+++ b/src/mailman_pgp/pgp/mime_multisig.py
@@ -23,8 +23,8 @@ from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.utils import collapse_rfc2231_value
-from mailman.email.message import Message, MultipartDigestMessage
-from pgpy import PGPDetachedSignature, PGPSignature
+from mailman.email.message import Message
+from pgpy import PGPSignature
from mailman_pgp.pgp.mime import MIMEWrapper
from mailman_pgp.utils.email import copy_headers
@@ -74,30 +74,20 @@ class MIMEMultiSigWrapper(MIMEWrapper):
continue
yield sig
- def _wrap_signed_multiple(self, msg, payload_msg, sig_msgs, signatures,
- signature):
+ def sign(self, key, **kwargs):
"""
- As per draft-ietf-openpgp-multsig-02.
+ Sign a message with key.
- :param msg:
- :param payload_msg:
- :param sig_msgs:
- :param signatures:
- :param signature:
+ :param key: The key to sign with.
+ :type key: pgpy.PGPKey
:return:
+ :rtype: MIMEMultiSigWrapper
"""
- micalg = ', '.join(self._micalg(sig.hash_algorithm)
- for sig in signatures + signature.signatures)
- out = MultipartDigestMessage('signed', micalg=micalg,
- protocol='multipart/mixed')
- out.preamble = MIMEMultiSigWrapper._signature_preamble
- second_part = MIMEMultipart()
- for sig_msg in sig_msgs:
- second_part.attach(copy.deepcopy(sig_msg))
-
- for sig in signature.signatures:
- sig_part = MIMEApplication(_data=str(sig),
+ if self.is_signed():
+ signed = next(iter(self.get_signed()))
+ signature = key.sign(signed, **kwargs)
+ sig_part = MIMEApplication(_data=str(signature),
_subtype=MIMEWrapper._signature_subtype,
_encoder=encode_7or8bit,
name='signature.asc')
@@ -105,36 +95,34 @@ class MIMEMultiSigWrapper(MIMEWrapper):
'OpenPGP digital signature')
sig_part.add_header('Content-Disposition', 'attachment',
filename='signature.asc')
- second_part.attach(sig_part)
-
- out.attach(copy.deepcopy(payload_msg))
- out.attach(second_part)
- copy_headers(msg, out)
- return out
-
- def sign(self, key, **kwargs):
- """
- Sign a message with key.
-
- :param key: The key to sign with.
- :type key: pgpy.PGPKey
- :return: The signed message.
- :rtype: mailman.email.message.Message
- """
-
- if self.is_signed():
- payload_msg = self.msg.get_payload(0)
- sig_msgs = [part for part in self.msg.get_payload(1).get_payload()]
+ micalg = self.msg.get_param('micalg')
+ micalg += ',' + self._micalg(signature.hash_algorithm)
+ self.msg.set_param('micalg', micalg)
+ self.msg.get_payload(1).attach(sig_part)
else:
- payload_msg = self.msg
- sig_msgs = []
- # TODO: exception safe this
- signatures = [PGPSignature.from_blob(sig_msg.get_payload())
- for sig_msg in sig_msgs]
- signature = PGPDetachedSignature()
- signature |= key.sign(payload_msg.as_string(), **kwargs)
- return self._wrap_signed_multiple(self.msg, payload_msg, sig_msgs,
- signatures, signature)
+ original_msg = copy.deepcopy(self.msg)
+ to_sign = next(iter(self.get_payload()))
+ signature = key.sign(to_sign, **kwargs)
+ self.msg.set_payload([])
+ self.msg.attach(original_msg)
+ self.msg.set_type('multipart/signed')
+ self.msg['MIME-Version'] = '1.0'
+ self.msg.set_param('protocol', 'multipart/mixed')
+ self.msg.set_param('micalg',
+ self._micalg(signature.hash_algorithm))
+ sig_part = MIMEApplication(_data=str(signature),
+ _subtype=MIMEWrapper._signature_subtype,
+ _encoder=encode_7or8bit,
+ name='signature.asc')
+ sig_part.add_header('Content-Description',
+ 'OpenPGP digital signature')
+ sig_part.add_header('Content-Disposition', 'attachment',
+ filename='signature.asc')
+ second_part = MIMEMultipart()
+ second_part.attach(sig_part)
+ self.msg.attach(second_part)
+ self.msg.preamble = MIMEMultiSigWrapper._signature_preamble
+ return self
def verify(self, key):
"""
@@ -159,11 +147,10 @@ class MIMEMultiSigWrapper(MIMEWrapper):
:param key: The key to decrypt with.
:type key: pgpy.PGPKey
- :return: The decrypted message.
- :rtype: mailman.email.message.Message
+ :return:
+ :rtype: MIMEMultiSigWrapper
"""
pmsg = next(iter(self.get_encrypted()))
- # TODO: exception safe this
decrypted = key.decrypt(pmsg)
dmsg = decrypted.message
@@ -172,8 +159,30 @@ class MIMEMultiSigWrapper(MIMEWrapper):
out = message_from_string(dmsg, _class=Message)
if decrypted.is_signed:
- out = self._wrap_signed_multiple(self.msg, out, [], [],
- decrypted.detached_signature)
+ # result should be a multisig signed thing, so [out, ]
+ self.msg.set_payload([])
+ self.msg.attach(out)
+ self.msg.set_type('multipart/signed')
+ self.msg['MIME-Version'] = '1.0'
+ self.msg.set_param('protocol', 'multipart/mixed')
+ micalg = ', '.join(self._micalg(sig.hash_algorithm)
+ for sig in decrypted.signatures)
+ self.msg.set_param('micalg', micalg)
+ second_part = MIMEMultipart()
+ for signature in decrypted.signatures:
+ sig_part = MIMEApplication(_data=str(signature),
+ _subtype=MIMEWrapper._signature_subtype,
+ _encoder=encode_7or8bit,
+ name='signature.asc')
+ sig_part.add_header('Content-Description',
+ 'OpenPGP digital signature')
+ sig_part.add_header('Content-Disposition', 'attachment',
+ filename='signature.asc')
+ second_part.attach(sig_part)
+ self.msg.attach(second_part)
else:
- copy_headers(self.msg, out)
- return out
+ # result should be
+ self.msg.set_payload(out.get_payload())
+ copy_headers(out, self.msg, True)
+
+ return self
diff --git a/src/mailman_pgp/pgp/tests/test_wrapper.py b/src/mailman_pgp/pgp/tests/test_wrapper.py
index b9c157a..b49eece 100644
--- a/src/mailman_pgp/pgp/tests/test_wrapper.py
+++ b/src/mailman_pgp/pgp/tests/test_wrapper.py
@@ -242,5 +242,5 @@ class TestPGPWrapper(PGPWrapperTestCase):
def test_defaults(self):
PGPWrapper(self.msg)
for wrap_class in (InlineWrapper, MIMEWrapper, MIMEMultiSigWrapper):
- PGPWrapper(self.msg, wrap_class)
- self.assertRaises(ValueError, PGPWrapper, self.msg, 'Not a class')
+ PGPWrapper(self.msg, default=wrap_class)
+ self.assertRaises(ValueError, PGPWrapper, self.msg, default='Not a class')
diff --git a/src/mailman_pgp/pgp/wrapper.py b/src/mailman_pgp/pgp/wrapper.py
index b8ed264..f746229 100644
--- a/src/mailman_pgp/pgp/wrapper.py
+++ b/src/mailman_pgp/pgp/wrapper.py
@@ -15,11 +15,11 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
"""A combined PGP/MIME + inline PGP wrapper."""
-import copy
from pgpy.errors import PGPError
from public import public
+from mailman_pgp.pgp.base import BaseWrapper
from mailman_pgp.pgp.inline import InlineWrapper
from mailman_pgp.pgp.mime import MIMEWrapper
from mailman_pgp.pgp.mime_multisig import MIMEMultiSigWrapper
@@ -27,22 +27,25 @@ from mailman_pgp.utils.pgp import verifies
@public
-class PGPWrapper():
+class PGPWrapper(BaseWrapper):
"""A combined PGP/MIME + inline PGP wrapper."""
- def __init__(self, msg, default=MIMEWrapper):
+ def __init__(self, msg, copy=False, default=MIMEWrapper):
"""
Wrap the given message.
:param msg: The message to wrap.
:type msg: mailman.email.message.Message
- :param default:
+ :param copy: Whether to copy the message when wrapping.
+ :type copy: bool
+ :param default: The wrapper class used for active operations (sign,
+ encrypt, attach_keys, attach_revocs)
:type default: Type[MIMEWrapper|MIMEMultiSigWrapper|InlineWrapper]
"""
- self.msg = msg
- self.mime = MIMEWrapper(msg)
- self.inline = InlineWrapper(msg)
- self.multisig = MIMEMultiSigWrapper(msg)
+ super().__init__(msg, copy)
+ self.mime = MIMEWrapper(self.msg)
+ self.inline = InlineWrapper(self.msg)
+ self.multisig = MIMEMultiSigWrapper(self.msg)
self.wrappers = (self.mime, self.inline, self.multisig)
if default is MIMEWrapper:
self.default = self.mime
@@ -56,6 +59,10 @@ class PGPWrapper():
MIMEMultiSigWrapper.__name__ + ' ' +
InlineWrapper.__name__ + '.')
+ def _rewrap(self, wrapper):
+ if wrapper is not None:
+ return PGPWrapper(wrapper.msg, default=self.default.__class__)
+
def get_payload(self):
return self.default.get_payload()
@@ -111,10 +118,10 @@ class PGPWrapper():
:param key: The key to sign with.
:type key: pgpy.PGPKey
- :return: The signed message.
- :rtype: mailman.email.message.Message
+ :return:
+ :rtype: PGPWrapper
"""
- return self.default.sign(key, **kwargs)
+ return self._rewrap(self.default.sign(key, **kwargs))
def verify(self, key):
"""
@@ -172,12 +179,10 @@ class PGPWrapper():
:param keys: The key/s to encrypt with.
:type keys: pgpy.PGPKey
- :param cipher: The symmetric cipher to use.
- :type cipher: SymmetricKeyAlgorithm
- :return: The encrypted message.
- :rtype: mailman.email.message.Message
+ :return:
+ :rtype: PGPWrapper
"""
- return self.default.encrypt(*keys, **kwargs)
+ return self._rewrap(self.default.encrypt(*keys, **kwargs))
def decrypt(self, key):
"""
@@ -185,16 +190,18 @@ class PGPWrapper():
:param key: The key to decrypt with.
:type key: pgpy.PGPKey
- :return: The decrypted message.
:raises: pgpy.errors.PGPError
- :rtype: mailman.email.message.Message
+ :return:
+ :rtype: PGPWrapper
"""
+ result = None
if self.mime.is_encrypted():
- return self.mime.decrypt(key)
+ result = self.mime.decrypt(key)
elif self.multisig.is_encrypted():
- return self.multisig.decrypt(key)
+ result = self.multisig.decrypt(key)
elif self.inline.is_encrypted():
- return self.inline.decrypt(key)
+ result = self.inline.decrypt(key)
+ return self._rewrap(result)
def try_decrypt(self, key):
"""
@@ -204,12 +211,12 @@ class PGPWrapper():
:type key: pgpy.PGPKey
:return: The decrypted message, if successfully decrypted,
else original message.
- :rtype: mailman.email.message.Message
+ :rtype: PGPWrapper
"""
try:
- return self.decrypt(key)
+ return self._rewrap(self.decrypt(key))
except PGPError:
- return copy.deepcopy(self.msg)
+ return self
def sign_encrypt(self, key, *keys, **kwargs):
"""
@@ -223,27 +230,10 @@ class PGPWrapper():
:type hash: pgpy.constants.HashAlgorithm
:param cipher:
:type cipher: pgpy.constants.SymmetricKeyAlgorithm
- :return: The signed + encrypted message.
- :rtype: mailman.email.message.Message
- """
- return self.default.sign_encrypt(key, *keys, **kwargs)
-
- def sign_then_encrypt(self, key, *keys, **kwargs):
- """
- Sign then encrypt the message.
-
- :param key: The key to sign with.
- :type key: pgpy.PGPKey
- :param keys: The key/s to encrypt with.
- :type keys: pgpy.PGPKey
- :param hash:
- :type hash: pgpy.constants.HashAlgorithm
- :param cipher:
- :type cipher: pgpy.constants.SymmetricKeyAlgorithm
- :return: The signed + encrypted message.
- :rtype: mailman.email.message.Message
+ :return:
+ :rtype: PGPWrapper
"""
- return self.default.sign_then_encrypt(key, *keys, **kwargs)
+ return self._rewrap(self.default.sign_encrypt(key, *keys, **kwargs))
def is_keys(self):
"""
diff --git a/src/mailman_pgp/rules/tests/test_signature.py b/src/mailman_pgp/rules/tests/test_signature.py
index 827b187..4fd28ac 100644
--- a/src/mailman_pgp/rules/tests/test_signature.py
+++ b/src/mailman_pgp/rules/tests/test_signature.py
@@ -175,8 +175,8 @@ To: test@example.com
self.pgp_list.duplicate_sig_action = Action.defer
msgdata = {}
- wrapped = MIMEWrapper(self.msg_clear)
- msg = wrapped.sign(self.sender_key, expires=timedelta(seconds=1))
+ wrapped = MIMEWrapper(self.msg_clear, True)
+ msg = wrapped.sign(self.sender_key, expires=timedelta(seconds=1)).msg
time.sleep(2)
matches = self.rule.check(self.mlist, msg, msgdata)
@@ -184,11 +184,13 @@ To: test@example.com
self.assertAction(msgdata, Action.hold, ['Signature is expired.'])
msgdata = {}
- wrapped = InlineWrapper(self.msg_clear)
- msg = wrapped.sign(self.sender_key, expires=timedelta(seconds=1))
+ wrapped = InlineWrapper(self.msg_clear, True)
+ msg = wrapped.sign(self.sender_key, expires=timedelta(seconds=1)).msg
time.sleep(2)
matches = self.rule.check(self.mlist, msg, msgdata)
+ # TODO: test when the key is expired
+
self.assertTrue(matches)
self.assertAction(msgdata, Action.hold, ['Signature is expired.'])
@@ -206,7 +208,7 @@ To: test@example.com
msgdata = {}
wrapped = MIMEWrapper(self.msg_clear)
- msg = wrapped.sign(self.sender_key)
+ msg = wrapped.sign(self.sender_key).msg
matches = self.rule.check(self.mlist, msg, msgdata)
self.assertTrue(matches)
diff --git a/src/mailman_pgp/runners/incoming.py b/src/mailman_pgp/runners/incoming.py
index 9931576..abedd32 100644
--- a/src/mailman_pgp/runners/incoming.py
+++ b/src/mailman_pgp/runners/incoming.py
@@ -61,7 +61,7 @@ class PGPIncomingRunner(Runner):
return True
try:
- msg = wrapped.decrypt(list_key)
+ msg = wrapped.decrypt(list_key).msg
except PGPError:
reason = 'Message could not be decrypted.'
log.info('[pgp] {}{}: {}'.format(
diff --git a/src/mailman_pgp/runners/tests/test_incoming.py b/src/mailman_pgp/runners/tests/test_incoming.py
index 05e910a..aaf597c 100644
--- a/src/mailman_pgp/runners/tests/test_incoming.py
+++ b/src/mailman_pgp/runners/tests/test_incoming.py
@@ -87,11 +87,10 @@ To: no-key@example.com
Some text.
""")
- wrapped = PGPWrapper(msg)
- encrypted = wrapped.encrypt(self.pgp_list.pubkey)
+ PGPWrapper(msg).encrypt(self.pgp_list.pubkey)
msgdata = dict(listid='no-key.example.com')
- mm_config.switchboards['in'].enqueue(encrypted, msgdata)
+ mm_config.switchboards['in'].enqueue(msg, msgdata)
runner = make_testable_runner(PGPIncomingRunner, 'in',
lambda runner: True)
runner.run()
@@ -130,12 +129,10 @@ To: test@example.com
{}
""".format(str(payload)))
- wrapped = PGPWrapper(msg)
- encrypted = wrapped.encrypt(self.pgp_list.pubkey)
+ encrypted = PGPWrapper(msg).copy().encrypt(self.pgp_list.pubkey).msg
msgdata = dict(listid='test.example.com')
- mm_config.switchboards['in'].enqueue(encrypted,
- msgdata)
+ mm_config.switchboards['in'].enqueue(encrypted, msgdata)
self.runner.run()
items = get_queue_messages('in_default', expected_count=1)
out_msg = items[0].msg
@@ -149,14 +146,13 @@ To: test@example.com
{}
""".format(str(payload)))
- wrapped = PGPWrapper(msg)
- encrypted_signed = wrapped.sign_encrypt(self.sender_key,
- self.pgp_list.pubkey,
- self.pgp_sender.key)
+ PGPWrapper(msg).sign_encrypt(self.sender_key,
+ self.pgp_list.pubkey,
+ self.pgp_sender.key)
msgdata = dict(listid='test.example.com')
- mm_config.switchboards['in'].enqueue(encrypted_signed,
- msgdata)
+ mm_config.switchboards['in'].enqueue(msg, msgdata)
+
self.runner.run()
items = get_queue_messages('in_default', expected_count=1)
out_msg = items[0].msg
@@ -173,12 +169,10 @@ To: test@example.com
{}
""".format(str(payload)))
- wrapped = PGPWrapper(msg)
- encrypted = wrapped.encrypt(self.sender_key.pubkey)
+ PGPWrapper(msg).encrypt(self.sender_key.pubkey)
msgdata = dict(listid='test.example.com')
- mm_config.switchboards['in'].enqueue(encrypted,
- msgdata)
+ mm_config.switchboards['in'].enqueue(msg, msgdata)
self.runner.run()
items = get_queue_messages('in_default', expected_count=1)
self.assertEqual(items[0].msgdata['pgp_action'],
diff --git a/src/mailman_pgp/testing/pgp.py b/src/mailman_pgp/testing/pgp.py
index afb62c8..a50ae02 100644
--- a/src/mailman_pgp/testing/pgp.py
+++ b/src/mailman_pgp/testing/pgp.py
@@ -88,16 +88,12 @@ class WrapperTestCase(unittest.TestCase):
self.assertEqual(wrapped.has_signature(), has)
def sign(self, message, key):
- wrapped = self.wrap(message)
- signed = wrapped.sign(key)
- signed_wrapped = self.wrap(signed)
- self.assertTrue(signed_wrapped.is_signed())
+ signed = self.wrap(message).sign(key)
+ self.assertTrue(signed.is_signed())
def sign_verify(self, message, priv, pub):
- wrapped = self.wrap(message)
- signed = wrapped.sign(priv)
- signed_wrapped = self.wrap(signed)
- for signature in signed_wrapped.verify(pub):
+ signed = self.wrap(message).sign(priv)
+ for signature in signed.verify(pub):
self.assertTrue(bool(signature))
def verify(self, message, key, valid):
@@ -114,29 +110,20 @@ class WrapperTestCase(unittest.TestCase):
self.assertEqual(wrapped.has_encryption(), has)
def encrypt(self, message, *keys, **kwargs):
- wrapped = self.wrap(message)
- encrypted = wrapped.encrypt(*keys, **kwargs)
- encrypted_wrapped = self.wrap(encrypted)
- self.assertTrue(encrypted_wrapped.is_encrypted())
+ encrypted = self.wrap(message).encrypt(*keys, **kwargs)
+ self.assertTrue(encrypted.is_encrypted())
def encrypt_decrypt(self, message, pub, priv):
- wrapped = self.wrap(message)
- encrypted = wrapped.encrypt(pub)
+ decrypted = self.wrap(message).copy().encrypt(pub).decrypt(priv)
- encrypted_wrapped = self.wrap(encrypted)
- decrypted = encrypted_wrapped.decrypt(priv)
- decrypted_wrapped = self.wrap(decrypted)
-
- self.assertFalse(decrypted_wrapped.is_encrypted())
- self.assertTrue(payload_equal(decrypted, message))
+ self.assertFalse(decrypted.is_encrypted())
+ self.assertTrue(payload_equal(decrypted.msg, message))
def decrypt(self, message, key, clear):
- wrapped = self.wrap(message)
- decrypted = wrapped.decrypt(key)
- decrypted_wrapped = self.wrap(decrypted)
+ decrypted = self.wrap(message).copy().decrypt(key)
- self.assertFalse(decrypted_wrapped.is_encrypted())
- self.assertEqual(decrypted.get_payload(), clear)
+ self.assertFalse(decrypted.is_encrypted())
+ self.assertEqual(decrypted.msg.get_payload(), clear)
def has_keys(self, message, has_keys):
wrapped = self.wrap(message)
@@ -156,9 +143,7 @@ class WrapperTestCase(unittest.TestCase):
self.assertListEqual(loaded_fingerprints, fingerprints)
def attach_keys(self, message, keys):
- wrapped = self.wrap(message)
- attached = wrapped.attach_keys(*keys)
- wrapped = self.wrap(attached)
+ wrapped = self.wrap(message).attach_keys(*keys)
loaded = list(wrapped.keys())
self.assertTrue(wrapped.has_keys())
@@ -184,9 +169,7 @@ class WrapperTestCase(unittest.TestCase):
self.assertListEqual(loaded_issuers, issuers)
def attach_revocs(self, message, revocs):
- wrapped = self.wrap(message)
- attached = wrapped.attach_revocs(*revocs)
- wrapped = self.wrap(attached)
+ wrapped = self.wrap(message).attach_revocs(*revocs)
loaded = list(wrapped.revocs())
self.assertTrue(wrapped.has_revocs())
@@ -196,46 +179,41 @@ class WrapperTestCase(unittest.TestCase):
def sign_encrypt_decrypt_verify(self, message, sign_key, encrypt_key):
wrapped = self.wrap(message)
- encrypted = wrapped.sign_encrypt(sign_key, encrypt_key.pubkey)
- encrypted_wrapped = self.wrap(encrypted)
- self.assertTrue(encrypted_wrapped.is_encrypted())
+ encrypted = wrapped.copy().sign_encrypt(sign_key, encrypt_key.pubkey)
+ self.assertTrue(encrypted.is_encrypted())
- decrypted = encrypted_wrapped.decrypt(encrypt_key)
- decrypted_wrapped = self.wrap(decrypted)
- self.assertTrue(decrypted_wrapped.is_signed())
- self.assertFalse(decrypted_wrapped.is_encrypted())
+ decrypted = encrypted.decrypt(encrypt_key)
+ self.assertTrue(decrypted.is_signed())
+ self.assertFalse(decrypted.is_encrypted())
- verification = decrypted_wrapped.verify(sign_key.pubkey)
+ verification = decrypted.verify(sign_key.pubkey)
for sig in verification:
self.assertTrue(bool(sig))
- self.assertListEqual(list(decrypted_wrapped.get_signed()),
+ self.assertListEqual(list(decrypted.get_signed()),
list(wrapped.get_payload()))
def sign_then_encrypt_decrypt_verify(self, message, sign_key, encrypt_key):
wrapped = self.wrap(message)
- encrypted = wrapped.sign_then_encrypt(sign_key, encrypt_key.pubkey)
- encrypted_wrapped = self.wrap(encrypted)
- self.assertTrue(encrypted_wrapped.is_encrypted())
+ encrypted = wrapped.copy().sign(sign_key).encrypt(encrypt_key.pubkey)
+ self.assertTrue(encrypted.is_encrypted())
- decrypted = encrypted_wrapped.decrypt(encrypt_key)
- decrypted_wrapped = self.wrap(decrypted)
- self.assertTrue(decrypted_wrapped.is_signed())
- self.assertFalse(decrypted_wrapped.is_encrypted())
+ decrypted = encrypted.copy().decrypt(encrypt_key)
+ self.assertTrue(decrypted.is_signed())
+ self.assertFalse(decrypted.is_encrypted())
- verification = decrypted_wrapped.verify(sign_key.pubkey)
+ verification = decrypted.verify(sign_key.pubkey)
for sig in verification:
self.assertTrue(bool(sig))
- self.assertListEqual(list(decrypted_wrapped.get_signed()),
+ self.assertListEqual(list(decrypted.get_signed()),
list(wrapped.get_payload()))
def decrypt_verify(self, message, decrypt_key, verify_key, valid):
wrapped = self.wrap(message)
- decrypted = wrapped.decrypt(decrypt_key)
- decrypted_wrapped = self.wrap(decrypted)
+ decrypted = wrapped.copy().decrypt(decrypt_key)
- self.assertFalse(decrypted_wrapped.is_encrypted())
- self.assertTrue(decrypted_wrapped.is_signed())
+ self.assertFalse(decrypted.is_encrypted())
+ self.assertTrue(decrypted.is_signed())
- verification = decrypted_wrapped.verify(verify_key)
+ verification = decrypted.verify(verify_key)
for sig in verification:
self.assertEqual(bool(sig), valid)
diff --git a/src/mailman_pgp/workflows/key_change.py b/src/mailman_pgp/workflows/key_change.py
index 1d07903..66fb4e9 100644
--- a/src/mailman_pgp/workflows/key_change.py
+++ b/src/mailman_pgp/workflows/key_change.py
@@ -29,7 +29,6 @@ from zope.interface import implementer
from mailman_pgp.config import config
from mailman_pgp.database import transaction
from mailman_pgp.pgp.wrapper import PGPWrapper
-from mailman_pgp.utils.email import copy_headers
from mailman_pgp.workflows.base import PGPMixin
from mailman_pgp.workflows.mod_approval import (
ModeratorKeyChangeApprovalMixin)
@@ -96,11 +95,8 @@ class KeyChangeBase(Workflow, PGPMixin):
CHANGE_CONFIRM_REQUEST.format(
self.pubkey.fingerprint,
self.token))
- wrapped = PGPWrapper(msg)
- encrypted = wrapped.sign_encrypt(self.pgp_list.key, self.pubkey)
+ PGPWrapper(msg).sign_encrypt(self.pgp_list.key, self.pubkey)
- msg.set_payload(encrypted.get_payload())
- copy_headers(encrypted, msg, True)
msg.send(self.mlist)
raise StopIteration
diff --git a/src/mailman_pgp/workflows/key_confirm.py b/src/mailman_pgp/workflows/key_confirm.py
index 0a38551..cf87832 100644
--- a/src/mailman_pgp/workflows/key_confirm.py
+++ b/src/mailman_pgp/workflows/key_confirm.py
@@ -23,7 +23,6 @@ from public import public
from mailman_pgp.database import transaction
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.wrapper import PGPWrapper
-from mailman_pgp.utils.email import copy_headers
CONFIRM_REQUEST = """\
----------
@@ -65,11 +64,8 @@ class ConfirmPubkeyMixin:
self.pgp_address.key_fingerprint,
self.token))
pgp_list = PGPMailingList.for_list(self.mlist)
- wrapped = PGPWrapper(msg)
- encrypted = wrapped.sign_encrypt(pgp_list.key, self.pgp_address.key)
+ PGPWrapper(msg).sign_encrypt(pgp_list.key, self.pgp_address.key)
- msg.set_payload(encrypted.get_payload())
- copy_headers(encrypted, msg, True)
msg.send(self.mlist)
raise StopIteration
diff --git a/src/mailman_pgp/workflows/mod_approval.py b/src/mailman_pgp/workflows/mod_approval.py
index 367f773..ea3369b 100644
--- a/src/mailman_pgp/workflows/mod_approval.py
+++ b/src/mailman_pgp/workflows/mod_approval.py
@@ -16,7 +16,6 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
""""""
-import copy
from mailman.email.message import UserNotification
from mailman.interfaces.pending import IPendings
@@ -25,7 +24,6 @@ from public import public
from zope.component import getUtility
from mailman_pgp.pgp.mime import MIMEWrapper
-from mailman_pgp.utils.email import overwrite_message
SUBSCRIPTION_MOD_REQUEST = """\
----------
@@ -83,11 +81,8 @@ class ModeratorApprovalMixin:
msg = UserNotification(
self.mlist.owner_address, self.mlist.owner_address,
subject, body, self.mlist.preferred_language)
- out = copy.deepcopy(msg)
- wrapped = MIMEWrapper(msg)
- msg = wrapped.attach_keys(self.pubkey)
- overwrite_message(msg, out)
- out.send(self.mlist)
+ MIMEWrapper(msg).attach_keys(self.pubkey)
+ msg.send(self.mlist)
raise StopIteration
def _step_receive_mod_confirmation(self):