diff options
| author | J08nY | 2017-08-10 02:44:31 +0200 |
|---|---|---|
| committer | J08nY | 2017-08-10 02:44:31 +0200 |
| commit | 590d9f8a59d5b3d231955f7781400a97cfd4c8c8 (patch) | |
| tree | feed841fd419cc8997a1a9a06a1e664759b0454a | |
| parent | 22a688b4f0579b1e1e51b4164934fe2afc357671 (diff) | |
| parent | dc30efedb9445dee5a04e53e356d8afde0f61e52 (diff) | |
| download | mailman-pgp-590d9f8a59d5b3d231955f7781400a97cfd4c8c8.tar.gz mailman-pgp-590d9f8a59d5b3d231955f7781400a97cfd4c8c8.tar.zst mailman-pgp-590d9f8a59d5b3d231955f7781400a97cfd4c8c8.zip | |
| -rw-r--r-- | src/mailman_pgp/commands/eml_key.py | 97 | ||||
| -rw-r--r-- | src/mailman_pgp/commands/tests/test_key.py | 293 | ||||
| -rw-r--r-- | src/mailman_pgp/database/types.py | 57 | ||||
| -rw-r--r-- | src/mailman_pgp/model/list.py | 7 | ||||
| -rw-r--r-- | src/mailman_pgp/mta/personalized.py | 2 | ||||
| -rw-r--r-- | src/mailman_pgp/pgp/__init__.py | 1 | ||||
| -rw-r--r-- | src/mailman_pgp/pgp/mime.py | 2 |
7 files changed, 453 insertions, 6 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index 46303c3..a298ea7 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -22,6 +22,7 @@ from email.mime.text import MIMEText from mailman.email.message import UserNotification from mailman.interfaces.command import ContinueProcessing, IEmailCommand +from mailman.interfaces.member import MemberRole from mailman.interfaces.pending import IPendings from mailman.interfaces.subscriptions import ISubscriptionManager from mailman.interfaces.usermanager import IUserManager @@ -351,7 +352,101 @@ def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results): def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results): # List public key attached, signed by the users current key. - pass + if len(arguments) != 1: + print('Extraneous argument/s: ' + ','.join(arguments[1:]), + file=results) + return ContinueProcessing.no + + email = get_email(msg) + if not email: + print('No email.', file=results) + return ContinueProcessing.no + + pgp_address = PGPAddress.for_email(email) + if pgp_address is None: + print('A pgp enabled address not found.', file=results) + return ContinueProcessing.no + + key = pgp_address.key + if key is None: + print("You currently don't have a key set.", file=results) + return ContinueProcessing.no + + if not pgp_address.key_confirmed: + print('Your key is currently not confirmed.', file=results) + return ContinueProcessing.no + + wrapped = PGPWrapper(msg) + if wrapped.is_encrypted(): + decrypted = wrapped.try_decrypt(pgp_list.key) + wrapped = PGPWrapper(decrypted) + + if not wrapped.has_keys(): + print('No keys attached? Send a key.', file=results) + return ContinueProcessing.no + + keys = list(wrapped.keys()) + if len(keys) != 1: + print('More than one key! Send only one key.', file=results) + return ContinueProcessing.no + key = keys.pop() + + allowed_signers = pgp_list.key_signing_allowed + roster_map = { + MemberRole.member: mlist.members, + MemberRole.owner: mlist.owners, + MemberRole.moderator: mlist.moderators, + MemberRole.nonmember: mlist.nonmembers + } + allowed = False + for allowed_role in allowed_signers: + allowed_roster = roster_map[allowed_role] + if allowed_roster.get_member(email) is not None: + allowed = True + break + + if not allowed: + print('You are not allowed to sign the list key.', file=results) + return ContinueProcessing.no + + if pgp_list.pubkey.key_material != key.key_material: + print('You sent a wrong key.', file=results) + return ContinueProcessing.no + + uid_map = {} + for uid in pgp_list.key.userids: + for uid_other in key.userids: + if uid == uid_other: + uid_map[uid] = uid_other + + if len(uid_map) == 0: + print('No signed UIDs found.', file=results) + return ContinueProcessing.no + + uid_sigs = {} + for uid, uid_other in uid_map.items(): + for sig in uid_other.signatures: + if sig in uid.signatures: + continue + if sig.signer != pgp_address.key.fingerprint.keyid: + continue + # sig is a new signature, not currenctly on uid, ans seems to + # be made by the pgp_address.key + verification = pgp_address.key.verify(uid, sig) + if bool(verification): + uid_sigs.setdefault(uid, []).append(sig) + + if len(uid_sigs) == 0: + print('No new certifications found.', file=results) + return ContinueProcessing.no + + for uid, sigs in uid_sigs.items(): + for sig in sigs: + uid |= sig + pgp_list.fs_key.save() + + print('List key updated with new signatures.', file=results) + return ContinueProcessing.yes def _cmd_receive(pgp_list, mlist, msg, msgdata, arguments, results): diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py index 266def6..5a6bb12 100644 --- a/src/mailman_pgp/commands/tests/test_key.py +++ b/src/mailman_pgp/commands/tests/test_key.py @@ -16,11 +16,12 @@ # this program. If not, see <http://www.gnu.org/licenses/>. """""" - +import copy import unittest from mailman.app.lifecycle import create_list from mailman.email.message import Message +from mailman.interfaces.member import MemberRole from mailman.interfaces.subscriptions import ISubscriptionManager from mailman.interfaces.usermanager import IUserManager from mailman.runners.command import CommandRunner @@ -942,6 +943,296 @@ class TestAfterSubscription(unittest.TestCase): self.assertEqual(len(revocs), 1) self.assertEqual(revoc.hash2, revocs[0].hash2) + def test_sign(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + self.mlist.subscribe(bart) + get_queue_messages('virgin') + + self.pgp_list.key_signing_allowed = {MemberRole.member} + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key sign') + wrapped_message = MIMEWrapper(message) + new_key = copy.copy(self.pgp_list.pubkey) + uid = next(iter(new_key.userids)) + sig = self.bart_key.certify(uid) + uid |= sig + message = wrapped_message.attach_keys(new_key) + + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn('List key updated with new signatures.', + results_msg.get_payload()) + + def test_sign_encrypted(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + self.mlist.subscribe(bart) + get_queue_messages('virgin') + + self.pgp_list.key_signing_allowed = {MemberRole.member} + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key sign') + wrapped_message = MIMEWrapper(message) + new_key = copy.copy(self.pgp_list.pubkey) + uid = next(iter(new_key.userids)) + sig = self.bart_key.certify(uid) + uid |= sig + message = wrapped_message.attach_keys(new_key) + wrapped_message = MIMEWrapper(message) + message = wrapped_message.encrypt(self.pgp_list.pubkey) + + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn('List key updated with new signatures.', + results_msg.get_payload()) + + def test_sign_extra_arg(self): + message = _create_plain('bart@example.com', 'test@example.com', + 'key sign extra arguments', '') + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn('Extraneous argument/s: extra,arguments', + results_msg.get_payload()) + + def test_sign_no_email(self): + message = _create_mixed('', 'test@example.com', 'key sign') + + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn('No email.', results_msg.get_payload()) + + def test_sign_no_pgp_address(self): + message = _create_mixed('bart@example.com', 'test@example.com', + 'key sign') + + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn('A pgp enabled address not found.', + results_msg.get_payload()) + + def test_sign_no_key_set(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + t.add(pgp_address) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key sign') + + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn("You currently don't have a key set.", + results_msg.get_payload()) + + def test_sign_key_not_confirmed(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + t.add(pgp_address) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key sign') + + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn('Your key is currently not confirmed.', + results_msg.get_payload()) + + def test_sign_no_key(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + message = _create_plain('bart@example.com', 'test@example.com', + 'key sign', '') + + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn('No keys attached? Send a key.', + results_msg.get_payload()) + + def test_sign_multiple_keys(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key sign') + + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_keys(self.bart_key.pubkey) + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_keys(self.bart_new_key.pubkey) + + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn('More than one key! Send only one key.', + results_msg.get_payload()) + + def test_sign_not_allowed(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + self.pgp_list.key_signing_allowed = {MemberRole.owner} + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key sign') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_keys(self.pgp_list.pubkey) + + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn('You are not allowed to sign the list key.', + results_msg.get_payload()) + + def test_sign_wrong_keymaterial(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + self.mlist.subscribe(bart) + get_queue_messages('virgin') + + self.pgp_list.key_signing_allowed = {MemberRole.member} + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key sign') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_keys(self.bart_key.pubkey) + + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn('You sent a wrong key.', + results_msg.get_payload()) + + def test_sign_no_uids(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + self.mlist.subscribe(bart) + get_queue_messages('virgin') + + self.pgp_list.key_signing_allowed = {MemberRole.member} + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key sign') + wrapped_message = MIMEWrapper(message) + new_key = copy.copy(self.pgp_list.pubkey) + for uid in new_key.userids: + new_key.del_uid(uid.email) + message = wrapped_message.attach_keys(new_key) + + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn('No signed UIDs found.', + results_msg.get_payload()) + + def test_sign_no_new_sig(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + self.mlist.subscribe(bart) + get_queue_messages('virgin') + + self.pgp_list.key_signing_allowed = {MemberRole.member} + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key sign') + wrapped_message = MIMEWrapper(message) + message = wrapped_message.attach_keys(self.pgp_list.pubkey) + + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn('No new certifications found.', + results_msg.get_payload()) + + def test_sign_no_sig_by_key(self): + bart = getUtility(IUserManager).create_address('bart@example.com', + 'Bart Person') + with transaction() as t: + pgp_address = PGPAddress(bart) + pgp_address.key = self.bart_key.pubkey + pgp_address.key_confirmed = True + t.add(pgp_address) + + self.mlist.subscribe(bart) + get_queue_messages('virgin') + + self.pgp_list.key_signing_allowed = {MemberRole.member} + + message = _create_mixed('bart@example.com', 'test@example.com', + 'key sign') + wrapped_message = MIMEWrapper(message) + new_key = copy.copy(self.pgp_list.pubkey) + uid = next(iter(new_key.userids)) + sig = self.bart_new_key.certify(uid) + uid |= sig + message = wrapped_message.attach_keys(new_key) + + items = _run_message(message, 1) + results_msg = items[0].msg + + self.assertIn('No new certifications found.', + results_msg.get_payload()) + class TestGeneral(unittest.TestCase): layer = PGPConfigLayer diff --git a/src/mailman_pgp/database/types.py b/src/mailman_pgp/database/types.py new file mode 100644 index 0000000..e86fb9f --- /dev/null +++ b/src/mailman_pgp/database/types.py @@ -0,0 +1,57 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. +from public import public +from sqlalchemy import Integer, TypeDecorator + + +@public +class EnumFlag(TypeDecorator): + """Simple bitwise flag from an `enum.Enum`, assumes it is unique.""" + impl = Integer + + def __init__(self, enum, *args, **kw): + super().__init__(*args, **kw) + self.enum = enum + + def process_bind_param(self, value, dialect): + # value is a set/frozenset of Enums. + if value is None: + return None + if not isinstance(value, (set, frozenset)): + raise ValueError + + int_value = 0 + for enum in reversed(self.enum): + int_value = int_value << 1 + if enum in value: + int_value += 1 + + return int_value + + def process_result_value(self, value, dialect): + # value is an integer. + if value is None: + return None + if value == 0: + return frozenset() + result = set() + for enum in self.enum: + if value % 2 == 1: + result.add(enum) + value = value >> 1 + + return frozenset(result) diff --git a/src/mailman_pgp/model/list.py b/src/mailman_pgp/model/list.py index c68cb67..13d4430 100644 --- a/src/mailman_pgp/model/list.py +++ b/src/mailman_pgp/model/list.py @@ -18,7 +18,8 @@ """Model for PGP enabled mailing lists.""" from mailman.database.types import Enum, SAUnicode from mailman.interfaces.action import Action -from mailman.interfaces.listmanager import (IListManager, ListDeletingEvent) +from mailman.interfaces.listmanager import IListManager, ListDeletingEvent +from mailman.interfaces.member import MemberRole from public import public from sqlalchemy import Boolean, Column, Integer from sqlalchemy.orm import reconstructor @@ -27,6 +28,7 @@ from zope.event import classhandler from mailman_pgp.config import config from mailman_pgp.database import transaction +from mailman_pgp.database.types import EnumFlag from mailman_pgp.model.base import Base from mailman_pgp.model.fs_key import FSKey @@ -57,6 +59,9 @@ class PGPMailingList(Base): # Key related properties key_change_workflow = Column(SAUnicode, default='pgp-key-change-mod-workflow') + key_signing_allowed = Column(EnumFlag(MemberRole), + default={MemberRole.owner, + MemberRole.moderator}) def __init__(self, mlist): """ diff --git a/src/mailman_pgp/mta/personalized.py b/src/mailman_pgp/mta/personalized.py index bd50c70..9fcc282 100644 --- a/src/mailman_pgp/mta/personalized.py +++ b/src/mailman_pgp/mta/personalized.py @@ -16,8 +16,6 @@ # this program. If not, see <http://www.gnu.org/licenses/>. """PGP enabled IndividualDelivery.""" -import copy - from mailman.mta.base import IndividualDelivery from mailman.mta.decorating import DecoratingMixin from mailman.mta.personalized import PersonalizedMixin diff --git a/src/mailman_pgp/pgp/__init__.py b/src/mailman_pgp/pgp/__init__.py index 6cf0f24..f5d35a6 100644 --- a/src/mailman_pgp/pgp/__init__.py +++ b/src/mailman_pgp/pgp/__init__.py @@ -33,6 +33,7 @@ class PGP: # Make sure the keydir paths are directories and exist. self.keydirs = {keydir_name: config.get_value('keydirs', keydir_name) for keydir_name in config.options('keydirs')} + for keydir_path in self.keydirs.values(): # TODO set a strict mode here keydir_path.mkdir(parents=True, exist_ok=True) diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py index 32e2cab..878fe59 100644 --- a/src/mailman_pgp/pgp/mime.py +++ b/src/mailman_pgp/pgp/mime.py @@ -24,7 +24,7 @@ from email.mime.application import MIMEApplication from email.utils import collapse_rfc2231_value from mailman.email.message import Message, MultipartDigestMessage -from pgpy import PGPMessage, PGPSignature, PGPDetachedSignature +from pgpy import PGPDetachedSignature, PGPMessage from pgpy.constants import HashAlgorithm, SymmetricKeyAlgorithm from public import public |
