diff options
Diffstat (limited to 'src/mailman_pgp/utils')
| -rw-r--r-- | src/mailman_pgp/utils/email.py | 21 | ||||
| -rw-r--r-- | src/mailman_pgp/utils/pgp.py | 75 | ||||
| -rw-r--r-- | src/mailman_pgp/utils/tests/__init__.py | 0 | ||||
| -rw-r--r-- | src/mailman_pgp/utils/tests/test_pgp.py | 86 |
4 files changed, 181 insertions, 1 deletions
diff --git a/src/mailman_pgp/utils/email.py b/src/mailman_pgp/utils/email.py index a936458..ac1ab66 100644 --- a/src/mailman_pgp/utils/email.py +++ b/src/mailman_pgp/utils/email.py @@ -16,8 +16,10 @@ # this program. If not, see <http://www.gnu.org/licenses/>. """""" +import copy from email.utils import parseaddr +from mailman.email.message import MultipartDigestMessage from public import public @@ -62,6 +64,25 @@ def overwrite_message(from_msg, to_msg): @public +def make_multipart(msg): + """ + + :param msg: + :type msg: email.message.Message + :return: + :rtype: email.message.MIMEMultipart| + mailman.email.message.MultipartDigestMessage + """ + if msg.is_multipart(): + out = copy.deepcopy(msg) + else: + out = MultipartDigestMessage() + out.attach(msg) + copy_headers(msg, out) + return out + + +@public def get_email(msg): display_name, email = parseaddr(msg['from']) # Address could be None or the empty string. diff --git a/src/mailman_pgp/utils/pgp.py b/src/mailman_pgp/utils/pgp.py index 4251693..a8f06f2 100644 --- a/src/mailman_pgp/utils/pgp.py +++ b/src/mailman_pgp/utils/pgp.py @@ -16,7 +16,11 @@ # this program. If not, see <http://www.gnu.org/licenses/>. """Miscellaneous PGP utilities.""" -from pgpy import PGPKey +from pgpy import PGPKey, PGPSignature +from pgpy.constants import SignatureType +from pgpy.errors import PGPError +from pgpy.packet import Packet, Signature +from pgpy.types import Armorable from public import public @@ -72,3 +76,72 @@ def key_from_file(file): """ key, _ = PGPKey.from_file(file) return key + + +@public +def revoc_from_blob(blob): + """ + Load a key revocation signature from an ASCII-Armored blob. + + :param blob: + :return: + :rtype: pgpy.PGPSignature + """ + dearm = Armorable.ascii_unarmor(blob) + p = Packet(dearm['body']) + + if not isinstance(p, Signature): + raise ValueError('Not a key revocation signature.') + if p.sigtype not in (SignatureType.KeyRevocation, + SignatureType.SubkeyRevocation): + raise ValueError('Not a key revocation.') + + sig = PGPSignature() + sig |= p + return sig + + +@public +def key_usable(key, flags_required): + """ + Check that the `key` has the `flags_required` set of KeyFlags. + + Checks only non-expired, non-revoked key/subkeys. Validates revocations it + can, so not those made with some other designated revocation key. + + :param key: The key to check. + :type key: pgpy.PGPKey + :param flags_required: The set of flags required. + :type flags_required: set + :return: Whether the key has the flags_required. + :rtype: bool + """ + if key.is_expired: + return False + for revoc in key.revocation_signatures: + try: + verified = key.verify(key, revoc) + except PGPError: + continue + if bool(verified): + return False + + usage_flags = key.usage_flags() + for subkey in key.subkeys.values(): + if subkey.is_expired: + continue + + valid = True + for revoc in subkey.revocation_signatures: + try: + verified = key.verify(subkey, revoc) + except PGPError: + continue + if bool(verified): + valid = False + break + + if valid: + usage_flags |= subkey.usage_flags() + + return flags_required.issubset(usage_flags) diff --git a/src/mailman_pgp/utils/tests/__init__.py b/src/mailman_pgp/utils/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/mailman_pgp/utils/tests/__init__.py diff --git a/src/mailman_pgp/utils/tests/test_pgp.py b/src/mailman_pgp/utils/tests/test_pgp.py new file mode 100644 index 0000000..b6433d4 --- /dev/null +++ b/src/mailman_pgp/utils/tests/test_pgp.py @@ -0,0 +1,86 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +import datetime +import time +from unittest import TestCase + +from parameterized import parameterized +from pgpy import PGPKey, PGPUID +from pgpy.constants import ( + CompressionAlgorithm, EllipticCurveOID, HashAlgorithm, KeyFlags, + PubKeyAlgorithm, SymmetricKeyAlgorithm) + +from mailman_pgp.testing.layers import PGPLayer +from mailman_pgp.testing.pgp import load_blob, load_key +from mailman_pgp.utils.pgp import key_usable, revoc_from_blob + + +class TestPGPUtils(TestCase): + layer = PGPLayer + + @parameterized.expand([ + (load_blob('revocs', 'rsa_1024.revoc.asc'), + load_key('rsa_1024.pub.asc')), + (load_blob('revocs', 'ecc_secp256k1.revoc.asc'), + load_key('ecc_secp256k1.pub.asc')), + (load_blob('revocs', 'ecc_p256.revoc.asc'), + load_key('ecc_p256.pub.asc')) + ]) + def test_revoc_from_blob_valid(self, blob, key): + revoc = revoc_from_blob(blob) + verifies = key.verify(key, revoc) + self.assertTrue(bool(verifies)) + + @parameterized.expand([ + ('Not an ASCII-Armored blob',), + (load_blob('keys', 'rsa_1024.pub.asc'),), + ]) + def test_revoc_from_blob_invalid(self, blob): + self.assertRaises(ValueError, revoc_from_blob, blob) + + def test_key_usable_expired(self): + key = PGPKey.new(PubKeyAlgorithm.ECDSA, EllipticCurveOID.SECP256K1) + uid = PGPUID.new('Some Name', email='anne@example.org') + key.add_uid(uid, key_expiration=datetime.timedelta(seconds=1), + usage={KeyFlags.Certify, + KeyFlags.Authentication, + KeyFlags.Sign}, + hashes=[HashAlgorithm.SHA256, + HashAlgorithm.SHA512], + ciphers=[SymmetricKeyAlgorithm.AES256], + compression=[CompressionAlgorithm.ZLIB]) + + time.sleep(2) + + self.assertFalse(key_usable(key, set())) + + def test_key_usable_revoked(self): + key = load_key('ecc_p256.priv.asc') + rsig = key.revoke(key) + key |= rsig + + self.assertFalse(key_usable(key, set())) + + def test_key_usable_subkey_revoked(self): + key = load_key('ecc_p256.priv.asc') + sub = next(iter(key.subkeys.values())) + rsig = key.revoke(sub) + sub |= rsig + + self.assertFalse(key_usable(key, {KeyFlags.EncryptCommunications})) |
