# Copyright (C) 2017 Jan Jancar # # This file is a part of the Mailman PGP plugin. # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more details. # # You should have received a copy of the GNU General Public License along with # this program. If not, see . """Signature checking rule for the pgp-posting-chain.""" from operator import attrgetter from mailman.core.i18n import _ from mailman.interfaces.action import Action from mailman.interfaces.chain import AcceptEvent from mailman.interfaces.rules import IRule from mailman.interfaces.usermanager import IUserManager from public import public from zope.component import getUtility from zope.event import classhandler from zope.interface import implementer from mailman_pgp.database import transaction from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList from mailman_pgp.model.sighash import PGPSigHash from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.utils.email import get_email from mailman_pgp.utils.moderation import record_action from mailman_pgp.utils.pgp import hashes, verifies @public @implementer(IRule) class Signature: """The signature checking rule.""" name = 'pgp-signature' description = _( 'A rule which enforces PGP enabled list signature configuration.') record = True def check(self, mlist, msg, msgdata): """See `IRule`.""" # Find the `PGPMailingList` this is for. pgp_list = PGPMailingList.for_list(mlist) if pgp_list is None: return False email = get_email(msg) # Wrap the message to work with it. wrapped = PGPWrapper(msg) # Take unsigned_msg_action if unsigned. if not wrapped.is_signed(): action = pgp_list.unsigned_msg_action if action != Action.defer: record_action(msg, msgdata, action, email, 'The message is unsigned.') return True # Take `inline_pgp_action` if inline signed. if wrapped.inline.is_signed(): action = pgp_list.inline_pgp_action if action != Action.defer: record_action(msg, msgdata, action, email, 'Inline PGP is not allowed.') return True # Lookup the address by sender, and its corresponding `PGPAddress`. user_manager = getUtility(IUserManager) address = user_manager.get_address(email) pgp_address = PGPAddress.for_address(address) if pgp_address is None: # Just let it continue. return False # See if we have a key. key = pgp_address.key if key is None: record_action(msg, msgdata, Action.reject, email, 'No key set for address {}.'.format(email)) return True if not pgp_address.key_confirmed: record_action(msg, msgdata, Action.reject, email, 'Key not confirmed.') return True verifications = list(wrapped.verify(key)) # Take the `invalid_sig_action` if the verification failed. if not verifies(verifications): action = pgp_list.invalid_sig_action if action != Action.defer: record_action(msg, msgdata, action, email, 'Signature did not verify.') return True sig_hashes = set(hashes(verifications)) duplicates = set(PGPSigHash.hashes(sig_hashes)) if duplicates: fingerprints = map(attrgetter('fingerprint'), duplicates) if key.fingerprint in fingerprints: action = pgp_list.duplicate_sig_action if action != Action.defer: record_action(msg, msgdata, action, email, 'Signature duplicate.') return True msgdata['pgp_sig_hashes'] = sig_hashes # XXX: we need to track key revocation separately to use it here # TODO: check key revocation here return False @classhandler.handler(AcceptEvent) def on_message_posting(event): """ Add sig hashes to sighash table. :param event: :type event: AcceptEvent """ pgp_list = PGPMailingList.for_list(event.mlist) if pgp_list is None: return pgp_address = PGPAddress.for_email(get_email(event.msg)) if pgp_address is None or pgp_address.key_fingerprint is None: return for sig_hash in event.msgdata['pgp_sig_hashes']: with transaction() as t: pgp_hash = PGPSigHash(hash=sig_hash, fingerprint=pgp_address.key_fingerprint) t.add(pgp_hash)