diff options
Diffstat (limited to 'src/mailman_pgp')
24 files changed, 987 insertions, 82 deletions
diff --git a/src/mailman_pgp/config/mailman.cfg b/src/mailman_pgp/config/mailman.cfg index bf470b0..24dc3bc 100644 --- a/src/mailman_pgp/config/mailman.cfg +++ b/src/mailman_pgp/config/mailman.cfg @@ -23,14 +23,11 @@ path: mailman_pgp enable: yes configuration: python:mailman_pgp.config.mailman_pgp +[mta] +outgoing: mailman_pgp.mta.deliver.deliver + [runner.in] class: mailman_pgp.runners.incoming.PGPIncomingRunner [runner.in_default] class: mailman.runners.incoming.IncomingRunner - -[runner.out] -class: mailman_pgp.runners.outgoing.OutgoingRunner - -[runner.out_default] -class: mailman.runners.outgoing.OutgoingRunner diff --git a/src/mailman_pgp/config/mailman_pgp.cfg b/src/mailman_pgp/config/mailman_pgp.cfg index e4166a7..1f0999a 100644 --- a/src/mailman_pgp/config/mailman_pgp.cfg +++ b/src/mailman_pgp/config/mailman_pgp.cfg @@ -24,7 +24,7 @@ url = sqlite:////$DATA_DIR/pgp.db [keydirs] # Key directory used to store user public keys. -user_keydir= $DATA_DIR/pgp/user_keydir/ +user_keydir = $DATA_DIR/pgp/user_keydir/ # Key directory used to store list keypairs. list_keydir = $DATA_DIR/pgp/list_keydir/ @@ -34,27 +34,28 @@ archive_keydir = $DATA_DIR/pgp/archive_keydir/ [keypairs] -# Whether to autogenerate +# Whether to autogenerate the list key on list creation. autogenerate = yes -# Length of primary list key. -key_length = 4096 +# Type of primary list key and its size. +# Format: type:size +# type is one of: +# RSA, DSA, ECDSA. +# size is the key size or curve name for ECDSA, which can be one of: +# nistp256, nistp384, nistp521, brainpoolP256r1, brainpoolP384r1, +# brainpoolP512r1, secp256k1 +primary_key = RSA:4096 -# Type of primary list key. -# One of RSA, DSA, ECDSA. -key_type = RSA - -# Length of list encryption subkey. -subkey_length = 4096 - -# Type of list encryption subkey. -# One of RSA, ECDH. -subkey_type = RSA +# Type of list encryption subkey and its size. +# Format: type:size +# type is one of: +# RSA, ECDH +# size is the key size or curve name for ECDH, which can be one of: +# nistp256, nistp384, nistp521, brainpoolP256r1, brainpoolP384r1, +# brainpoolP512r1, secp256k1 +sub_key = RSA:4096 [queues] # The queue to which processed incoming messages are passed. -in = in_default - -# The queue to which processed outgoing messages are passed. -out = out_default
\ No newline at end of file +in = in_default
\ No newline at end of file diff --git a/src/mailman_pgp/model/list.py b/src/mailman_pgp/model/list.py index da542de..0163026 100644 --- a/src/mailman_pgp/model/list.py +++ b/src/mailman_pgp/model/list.py @@ -57,6 +57,7 @@ class PGPMailingList(Base): # Encryption related properties nonencrypted_msg_action = Column(Enum(Action), default=Action.reject) + encrypt_outgoing = Column(Boolean, default=True) def __init__(self, mlist): super().__init__() @@ -103,7 +104,8 @@ class PGPMailingList(Base): def generate_key(self, block=False): self._key = None - self._key_generator = ListKeyGenerator(config.pgp.keypair_config, + self._key_generator = ListKeyGenerator(config.pgp.primary_key_args, + config.pgp.sub_key_args, self.mlist.display_name, self.mlist.posting_address, self.mlist.request_address, diff --git a/src/mailman_pgp/mta/__init__.py b/src/mailman_pgp/mta/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/mailman_pgp/mta/__init__.py diff --git a/src/mailman_pgp/mta/bulk.py b/src/mailman_pgp/mta/bulk.py new file mode 100644 index 0000000..a2cc8c6 --- /dev/null +++ b/src/mailman_pgp/mta/bulk.py @@ -0,0 +1,111 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""PGP enabled BulkDelivery.""" +import copy + +from mailman.mta.bulk import BulkDelivery +from public import public + +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.mime import MIMEWrapper +from mailman_pgp.utils.email import overwrite_message + + +class CallbackBulkDelivery(BulkDelivery): + """Bulk delivery that has a list of callbacks to run for each chunk.""" + + def __init__(self, max_recipients=None): + super().__init__(max_recipients=max_recipients) + self.callbacks = [] + + def deliver(self, mlist, msg, msgdata): + """See `IMailTransportAgentDelivery`.""" + refused = {} + for recipients in self.chunkify(msgdata.get('recipients', set())): + message_copy = copy.deepcopy(msg) + msgdata_copy = msgdata.copy() + recipients_copy = set(recipients) + + for callback in self.callbacks: + callback(mlist, message_copy, msgdata_copy, recipients_copy) + callback_refused = dict( + (recipient, (444, BaseException)) + for recipient in recipients - recipients_copy) + refused.update(callback_refused) + + chunk_refused = self._deliver_to_recipients( + mlist, message_copy, msgdata_copy, recipients_copy) + refused.update(chunk_refused) + return refused + + +class PGPBulkMixin: + """Bulk encryption and signing Delivery mixin.""" + + def sign_encrypt(self, mlist, msg, msgdata, recipients): + """ + Sign and encrypt the outgoing message to the recipients. + + :param mlist: + :type mlist: mailman.model.mailinglist.MailingList + :param msg: + :type msg: mailman.email.message.Message + :param msgdata: + :type msgdata: dict + :param recipients: + :type recipients: set + """ + pgp_list = PGPMailingList.for_list(mlist) + if not pgp_list: + return + if not pgp_list.encrypt_outgoing and not pgp_list.sign_outgoing: + # nothing to do + return + + keys = [] + for recipient in set(recipients): + pgp_address = PGPAddress.for_email(recipient) + if pgp_address is None: + recipients.remove(recipient) + continue + if pgp_address.key is None or not pgp_address.key_confirmed: + recipients.remove(recipient) + continue + keys.append(pgp_address.key) + + wrapped = MIMEWrapper(msg) + if pgp_list.sign_outgoing: + if pgp_list.encrypt_outgoing: + out = wrapped.sign_encrypt(pgp_list.key, pgp_list.pubkey, + *keys, throw_keyid=True) + else: + out = wrapped.sign(pgp_list.key) + else: + out = wrapped.encrypt(pgp_list.pubkey, *keys, throw_keyid=True) + + overwrite_message(out, msg) + + +@public +class PGPBulkDelivery(CallbackBulkDelivery, PGPBulkMixin): + """Bulk PGP enabled delivery.""" + + def __init__(self, max_recipients=None): + super().__init__(max_recipients=max_recipients) + self.callbacks.append(self.sign_encrypt) diff --git a/src/mailman_pgp/mta/deliver.py b/src/mailman_pgp/mta/deliver.py new file mode 100644 index 0000000..050a740 --- /dev/null +++ b/src/mailman_pgp/mta/deliver.py @@ -0,0 +1,136 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +import logging +import time + +from mailman.config import config +from mailman.interfaces.mailinglist import Personalization +from mailman.interfaces.mta import SomeRecipientsFailed +from mailman.mta.bulk import BulkDelivery +from mailman.mta.deliver import Deliver +from mailman.utilities.string import expand +from public import public + +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.mta.bulk import PGPBulkDelivery +from mailman_pgp.mta.personalized import PGPPersonalizedDelivery + +COMMA = ',' +log = logging.getLogger('mailman.smtp') + + +@public +def deliver(mlist, msg, msgdata): + """Deliver a message to the outgoing mail server.""" + # If there are no recipients, there's nothing to do. + recipients = msgdata.get('recipients') + if not recipients: + # Could be None, could be an empty sequence. + return + # Which delivery agent should we use? Several situations can cause us to + # use individual delivery. If not specified, use bulk delivery. See the + # to-outgoing handler for when the 'verp' key is set in the metadata. + personalized_agent = Deliver + bulk_agent = BulkDelivery + + pgp_list = PGPMailingList.for_list(mlist) + if pgp_list: + personalized_agent = PGPPersonalizedDelivery + bulk_agent = PGPBulkDelivery + + if msgdata.get('verp', False): + agent = personalized_agent() + elif mlist.personalize != Personalization.none: + agent = personalized_agent() + else: + agent = bulk_agent(int(config.mta.max_recipients)) + log.debug('Using agent: %s', agent) + # Keep track of the original recipients and the original sender for + # logging purposes. + original_recipients = msgdata['recipients'] + original_sender = msgdata.get('original-sender', msg.sender) + # Let the agent attempt to deliver to the recipients. Record all failures + # for re-delivery later. + t0 = time.time() + refused = agent.deliver(mlist, msg, msgdata) + t1 = time.time() + # Log this posting. + size = getattr(msg, 'original_size', msgdata.get('original_size')) + if size is None: + size = len(msg.as_string()) + substitutions = dict( + msgid=msg.get('message-id', 'n/a'), # noqa: E221, E251 + listname=mlist.fqdn_listname, # noqa: E221, E251 + sender=original_sender, # noqa: E221, E251 + recip=len(original_recipients), # noqa: E221, E251 + size=size, # noqa: E221, E251 + time=t1 - t0, # noqa: E221, E251 + refused=len(refused), # noqa: E221, E251 + smtpcode='n/a', # noqa: E221, E251 + smtpmsg='n/a', # noqa: E221, E251 + ) + template = config.logging.smtp.every + if template.lower() != 'no': + log.info('%s', expand(template, mlist, substitutions)) + if refused: + template = config.logging.smtp.refused + if template.lower() != 'no': + log.info('%s', expand(template, mlist, substitutions)) + else: + # Log the successful post, but if it was not destined to the mailing + # list (e.g. to the owner or admin), print the actual recipients + # instead of just the number. + if not msgdata.get('tolist', False): + recips = msg.get_all('to', []) + recips.extend(msg.get_all('cc', [])) + substitutions['recips'] = COMMA.join(recips) + template = config.logging.smtp.success + if template.lower() != 'no': + log.info('%s', expand(template, mlist, substitutions)) + # Process any failed deliveries. + temporary_failures = [] + permanent_failures = [] + for recipient, (code, smtp_message) in refused.items(): + # RFC 5321, $4.5.3.1.10 says: + # + # RFC 821 [1] incorrectly listed the error where an SMTP server + # exhausts its implementation limit on the number of RCPT commands + # ("too many recipients") as having reply code 552. The correct + # reply code for this condition is 452. Clients SHOULD treat a 552 + # code in this case as a temporary, rather than permanent, failure + # so the logic below works. + # + if code >= 500 and code != 552: + # A permanent failure + permanent_failures.append(recipient) + else: + # Deal with persistent transient failures by queuing them up for + # future delivery. TBD: this could generate lots of log entries! + temporary_failures.append(recipient) + template = config.logging.smtp.failure + if template.lower() != 'no': + substitutions.update( + recip=recipient, # noqa: E221, E251 + smtpcode=code, # noqa: E221, E251 + smtpmsg=smtp_message # noqa: E221, E251 + ) + log.info('%s', expand(template, mlist, substitutions)) + # Return the results + if temporary_failures or permanent_failures: + raise SomeRecipientsFailed(temporary_failures, permanent_failures) diff --git a/src/mailman_pgp/mta/personalized.py b/src/mailman_pgp/mta/personalized.py new file mode 100644 index 0000000..a89301f --- /dev/null +++ b/src/mailman_pgp/mta/personalized.py @@ -0,0 +1,93 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""PGP enabled IndividualDelivery.""" +from mailman.mta.base import IndividualDelivery +from mailman.mta.decorating import DecoratingMixin +from mailman.mta.personalized import PersonalizedMixin +from mailman.mta.verp import VERPMixin +from public import public + +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.mime import MIMEWrapper +from mailman_pgp.utils.email import overwrite_message + + +class PGPIndividualMixin: + """Individual encryption and signing Delivery mixin.""" + + def sign_encrypt(self, mlist, msg, msgdata): + """ + Sign and encrypt the outgoing message to the recipient. + + :param mlist: + :type mlist: mailman.model.mailinglist.MailingList + :param msg: + :type msg: mailman.email.message.Message + :param msgdata: + :type msgdata: dict + """ + pgp_list = PGPMailingList.for_list(mlist) + if not pgp_list: + return + if not pgp_list.encrypt_outgoing and not pgp_list.sign_outgoing: + # nothing to do + return + + recipient = msgdata['recipient'] + pgp_address = PGPAddress.for_email(recipient) + if pgp_address is None: + msgdata['no_deliver'] = True + return + if pgp_address.key is None or not pgp_address.key_confirmed: + msgdata['no_deliver'] = True + return + + key = pgp_address.key + wrapped = MIMEWrapper(msg) + + if pgp_list.sign_outgoing: + if pgp_list.encrypt_outgoing: + out = wrapped.sign_encrypt(pgp_list.key, key, pgp_list.pubkey) + else: + out = wrapped.sign(pgp_list.key) + else: + out = wrapped.encrypt(key, pgp_list.pubkey) + + overwrite_message(out, msg) + + +@public +class PGPPersonalizedDelivery(IndividualDelivery, VERPMixin, DecoratingMixin, + PersonalizedMixin, PGPIndividualMixin): + """Individual PGP enabled delivery.""" + + def __init__(self): + super().__init__() + self.callbacks.extend([ + self.avoid_duplicates, + self.decorate, + self.personalize_to, + self.sign_encrypt + ]) + + def _deliver_to_recipients(self, mlist, msg, msgdata, recipients): + if msgdata.get('no_deliver', False): + return dict((recipient, (444, BaseException)) for recipient in + recipients) + return super()._deliver_to_recipients(mlist, msg, msgdata, recipients) diff --git a/src/mailman_pgp/mta/tests/__init__.py b/src/mailman_pgp/mta/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/mailman_pgp/mta/tests/__init__.py diff --git a/src/mailman_pgp/mta/tests/test_bulk.py b/src/mailman_pgp/mta/tests/test_bulk.py new file mode 100644 index 0000000..ccb4988 --- /dev/null +++ b/src/mailman_pgp/mta/tests/test_bulk.py @@ -0,0 +1,195 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""Test the PGP enabled BulkDelivery.""" +import unittest + +from mailman.app.lifecycle import create_list +from mailman.interfaces.mailinglist import Personalization +from mailman.testing.helpers import ( + specialized_message_from_string as mfs, subscribe) + +from mailman_pgp.database import transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.mta.bulk import (PGPBulkDelivery) +from mailman_pgp.pgp.tests.base import load_key +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.utils.pgp import verifies + + +class BulkDeliveryTester(PGPBulkDelivery): + """Save deliveries made by the PGPBulkDelivery class.""" + + def __init__(self, max_recipients=None): + super().__init__(max_recipients=max_recipients) + self.deliveries = [] + + def _deliver_to_recipients(self, mlist, msg, msgdata, recipients): + self.deliveries.append((mlist, msg, msgdata, recipients)) + return [] + + +class TestPGPBulkDelivery(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + self.mlist = create_list('test@example.com', style_name='pgp-default') + self.mlist.personalize = Personalization.none + + self.list_key = load_key('ecc_p256.priv.asc') + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = self.list_key + + # Make Anne a member of this mailing list. + self.anne = subscribe(self.mlist, 'Anne', email='anne@example.org') + self.anne_key = load_key('rsa_1024.priv.asc') + + self.bart = subscribe(self.mlist, 'Bart', email='bart@example.org') + self.bart_key = load_key('ecc_secp256k1.priv.asc') + + with transaction() as t: + self.pgp_anne = PGPAddress(self.anne.address) + self.pgp_anne.key = self.anne_key.pubkey + self.pgp_anne.key_confirmed = True + t.add(self.pgp_anne) + + with transaction() as t: + self.pgp_bart = PGPAddress(self.bart.address) + self.pgp_bart.key = self.bart_key.pubkey + self.pgp_bart.key_confirmed = True + t.add(self.pgp_bart) + + # Clear out any results from the previous test. + self.msg = mfs("""\ +From: anne@example.org +To: test@example.com +Subject: test + +""") + + def test_sign_encrypt(self): + with transaction(): + self.pgp_list.sign_outgoing = True + self.pgp_list.encrypt_outgoing = True + + msgdata = dict(recipients=['anne@example.org', 'bart@example.org']) + agent = BulkDeliveryTester(2) + refused = agent.deliver(self.mlist, self.msg, msgdata) + + self.assertEqual(len(refused), 0) + self.assertEqual(len(agent.deliveries), 1) + + out_msg = agent.deliveries[0][1] + out_wrapped = PGPWrapper(out_msg) + self.assertTrue(out_wrapped.is_encrypted()) + + decrypted = out_wrapped.decrypt(self.list_key) + wrapped = PGPWrapper(decrypted) + self.assertTrue(wrapped.is_signed()) + self.assertTrue(verifies(wrapped.verify(self.list_key.pubkey))) + + decrypted = out_wrapped.decrypt(self.anne_key) + wrapped = PGPWrapper(decrypted) + self.assertTrue(wrapped.is_signed()) + self.assertTrue(verifies(wrapped.verify(self.list_key.pubkey))) + + decrypted = out_wrapped.decrypt(self.bart_key) + wrapped = PGPWrapper(decrypted) + self.assertTrue(wrapped.is_signed()) + self.assertTrue(verifies(wrapped.verify(self.list_key.pubkey))) + + def test_encrypt(self): + with transaction(): + self.pgp_list.sign_outgoing = False + self.pgp_list.encrypt_outgoing = True + + msgdata = dict(recipients=['anne@example.org', 'bart@example.org']) + agent = BulkDeliveryTester(2) + refused = agent.deliver(self.mlist, self.msg, msgdata) + + self.assertEqual(len(refused), 0) + self.assertEqual(len(agent.deliveries), 1) + + out_msg = agent.deliveries[0][1] + wrapped = PGPWrapper(out_msg) + self.assertTrue(wrapped.is_encrypted()) + wrapped.decrypt(self.list_key) + wrapped.decrypt(self.anne_key) + wrapped.decrypt(self.bart_key) + + def test_sign(self): + with transaction(): + self.pgp_list.sign_outgoing = True + self.pgp_list.encrypt_outgoing = False + + msgdata = dict(recipients=['anne@example.org', 'bart@example.org']) + agent = BulkDeliveryTester(2) + refused = agent.deliver(self.mlist, self.msg, msgdata) + + self.assertEqual(len(refused), 0) + self.assertEqual(len(agent.deliveries), 1) + + out_msg = agent.deliveries[0][1] + wrapped = PGPWrapper(out_msg) + self.assertTrue(wrapped.is_signed()) + self.assertTrue(verifies(wrapped.verify(self.list_key.pubkey))) + + def test_none(self): + with transaction(): + self.pgp_list.sign_outgoing = False + self.pgp_list.encrypt_outgoing = False + + msgdata = dict(recipients=['anne@example.org', 'bart@example.org']) + agent = BulkDeliveryTester(2) + refused = agent.deliver(self.mlist, self.msg, msgdata) + + self.assertEqual(len(refused), 0) + self.assertEqual(len(agent.deliveries), 1) + + out_msg = agent.deliveries[0][1] + wrapped = PGPWrapper(out_msg) + self.assertFalse(wrapped.is_signed()) + self.assertFalse(wrapped.is_encrypted()) + + def test_no_pgp_list(self): + ordinary_list = create_list('ordinary@example.com') + msgdata = dict(recipients=['anne@example.org', 'bart@example.org']) + agent = BulkDeliveryTester(2) + refused = agent.deliver(ordinary_list, self.msg, msgdata) + + self.assertEqual(len(refused), 0) + self.assertEqual(len(agent.deliveries), 1) + + def test_no_pgp_address(self): + msgdata = dict(recipients=['anne@example.org', 'someone@example.org']) + agent = BulkDeliveryTester(2) + refused = agent.deliver(self.mlist, self.msg, msgdata) + + self.assertEqual(len(refused), 1) + self.assertEqual(len(agent.deliveries), 1) + + def test_no_key(self): + with transaction(): + self.pgp_bart.key = None + msgdata = dict(recipients=['anne@example.org', 'bart@example.org']) + agent = BulkDeliveryTester(2) + refused = agent.deliver(self.mlist, self.msg, msgdata) + + self.assertEqual(len(refused), 1) + self.assertEqual(len(agent.deliveries), 1) diff --git a/src/mailman_pgp/mta/tests/test_personalized.py b/src/mailman_pgp/mta/tests/test_personalized.py new file mode 100644 index 0000000..0357757 --- /dev/null +++ b/src/mailman_pgp/mta/tests/test_personalized.py @@ -0,0 +1,181 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""Test the PGP enabled IndividualDelivery.""" +import unittest + +from mailman.app.lifecycle import create_list +from mailman.interfaces.mailinglist import Personalization +from mailman.testing.helpers import ( + specialized_message_from_string as mfs, subscribe) + +from mailman_pgp.database import transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.mta.personalized import PGPPersonalizedDelivery +from mailman_pgp.pgp.tests.base import load_key +from mailman_pgp.pgp.wrapper import PGPWrapper +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.utils.pgp import verifies + + +class PersonalizedDeliveryTester(PGPPersonalizedDelivery): + """Save the deliveries made by the PGPPersonalizedDelivery class.""" + + def __init__(self): + super().__init__() + self.deliveries = [] + + def _deliver_to_recipients(self, mlist, msg, msgdata, recipients): + self.deliveries.append((mlist, msg, msgdata, recipients)) + return [] + + +class TestPGPPersonalizedDelivery(unittest.TestCase): + layer = PGPConfigLayer + + def setUp(self): + self.mlist = create_list('test@example.com', style_name='pgp-default') + self.mlist.personalize = Personalization.individual + + self.list_key = load_key('ecc_p256.priv.asc') + self.pgp_list = PGPMailingList.for_list(self.mlist) + self.pgp_list.key = self.list_key + + # Make Anne a member of this mailing list. + self.anne = subscribe(self.mlist, 'Anne', email='anne@example.org') + self.anne_key = load_key('rsa_1024.priv.asc') + with transaction() as t: + self.pgp_anne = PGPAddress(self.anne.address) + self.pgp_anne.key = self.anne_key.pubkey + self.pgp_anne.key_confirmed = True + t.add(self.pgp_anne) + + # Clear out any results from the previous test. + self.msg = mfs("""\ +From: anne@example.org +To: test@example.com +Subject: test + +""") + + def test_sign_encrypt(self): + with transaction(): + self.pgp_list.sign_outgoing = True + self.pgp_list.encrypt_outgoing = True + + msgdata = dict(recipients=['anne@example.org']) + agent = PersonalizedDeliveryTester() + refused = agent.deliver(self.mlist, self.msg, msgdata) + + self.assertEqual(len(refused), 0) + self.assertEqual(len(agent.deliveries), 1) + + out_msg = agent.deliveries[0][1] + out_wrapped = PGPWrapper(out_msg) + self.assertTrue(out_wrapped.is_encrypted()) + + decrypted = out_wrapped.decrypt(self.list_key) + wrapped = PGPWrapper(decrypted) + self.assertTrue(wrapped.is_signed()) + + decrypted = out_wrapped.decrypt(self.anne_key) + wrapped = PGPWrapper(decrypted) + self.assertTrue(wrapped.is_signed()) + + def test_encrypt(self): + with transaction(): + self.pgp_list.sign_outgoing = False + self.pgp_list.encrypt_outgoing = True + + msgdata = dict(recipients=['anne@example.org']) + agent = PersonalizedDeliveryTester() + refused = agent.deliver(self.mlist, self.msg, msgdata) + + self.assertEqual(len(refused), 0) + self.assertEqual(len(agent.deliveries), 1) + + out_msg = agent.deliveries[0][1] + out_wrapped = PGPWrapper(out_msg) + self.assertTrue(out_wrapped.is_encrypted()) + + decrypted = out_wrapped.decrypt(self.list_key) + wrapped = PGPWrapper(decrypted) + self.assertFalse(wrapped.is_signed()) + + decrypted = out_wrapped.decrypt(self.anne_key) + wrapped = PGPWrapper(decrypted) + self.assertFalse(wrapped.is_signed()) + + def test_sign(self): + with transaction(): + self.pgp_list.sign_outgoing = True + self.pgp_list.encrypt_outgoing = False + + msgdata = dict(recipients=['anne@example.org']) + agent = PersonalizedDeliveryTester() + refused = agent.deliver(self.mlist, self.msg, msgdata) + + self.assertEqual(len(refused), 0) + self.assertEqual(len(agent.deliveries), 1) + + out_msg = agent.deliveries[0][1] + wrapped = PGPWrapper(out_msg) + self.assertTrue(wrapped.is_signed()) + self.assertTrue(verifies(wrapped.verify(self.list_key.pubkey))) + + def test_none(self): + with transaction(): + self.pgp_list.sign_outgoing = False + self.pgp_list.encrypt_outgoing = False + + msgdata = dict(recipients=['anne@example.org']) + agent = PersonalizedDeliveryTester() + refused = agent.deliver(self.mlist, self.msg, msgdata) + + self.assertEqual(len(refused), 0) + self.assertEqual(len(agent.deliveries), 1) + + out_msg = agent.deliveries[0][1] + wrapped = PGPWrapper(out_msg) + self.assertFalse(wrapped.is_signed()) + self.assertFalse(wrapped.is_encrypted()) + + def test_no_pgp_list(self): + ordinary_list = create_list('ordinary@example.com') + msgdata = dict(recipients=['anne@example.org']) + agent = PersonalizedDeliveryTester() + refused = agent.deliver(ordinary_list, self.msg, msgdata) + + self.assertEqual(len(refused), 0) + self.assertEqual(len(agent.deliveries), 1) + + def test_no_pgp_address(self): + msgdata = dict(recipients=['someone@example.org']) + agent = PGPPersonalizedDelivery() + refused = agent.deliver(self.mlist, self.msg, msgdata) + + self.assertEqual(len(refused), 1) + + def test_no_key(self): + with transaction(): + self.pgp_anne.key = None + msgdata = dict(recipients=['anne@example.org']) + agent = PGPPersonalizedDelivery() + refused = agent.deliver(self.mlist, self.msg, msgdata) + + self.assertEqual(len(refused), 1) diff --git a/src/mailman_pgp/pgp/__init__.py b/src/mailman_pgp/pgp/__init__.py index 31b61b3..b41c8a1 100644 --- a/src/mailman_pgp/pgp/__init__.py +++ b/src/mailman_pgp/pgp/__init__.py @@ -24,25 +24,29 @@ from os.path import join from mailman.config import config as mailman_config from mailman.utilities.string import expand from pgpy import PGPKeyring -from pgpy.constants import PubKeyAlgorithm +from pgpy.constants import PubKeyAlgorithm, EllipticCurveOID from public import public from mailman_pgp.config import config KEYDIR_CONFIG_PATHS = ['list_keydir', 'user_keydir', 'archive_keydir'] -KEYPAIR_CONFIG_VARIABLES = ['autogenerate', 'key_type', 'key_length', - 'subkey_type', 'subkey_length'] +KEYPAIR_CONFIG_VARIABLES = ['autogenerate', 'primary_key', 'sub_key'] -# The main key needs to support signing. -KEYPAIR_KEY_TYPE_VALID = ['RSA', 'DSA', 'ECDSA'] -# The subkey needs to support encryption. -KEYPAIR_SUBKEY_TYPE_VALID = ['RSA', 'ECDH'] KEYPAIR_TYPE_MAP = { 'RSA': PubKeyAlgorithm.RSAEncryptOrSign, 'DSA': PubKeyAlgorithm.DSA, 'ECDSA': PubKeyAlgorithm.ECDSA, 'ECDH': PubKeyAlgorithm.ECDH } +ECC_OID_MAP = { + 'nistp256': EllipticCurveOID.NIST_P256, + 'nistp384': EllipticCurveOID.NIST_P384, + 'nistp521': EllipticCurveOID.NIST_P521, + 'brainpoolP256r1': EllipticCurveOID.Brainpool_P256, + 'brainpoolP384r1': EllipticCurveOID.Brainpool_P384, + 'brainpoolP512r1': EllipticCurveOID.Brainpool_P512, + 'secp256k1': EllipticCurveOID.SECP256K1 +} @public @@ -56,7 +60,7 @@ class PGP: Load [keypairs] and [keydirs] config sections. Expand paths in them. """ # Get all the [keypairs] config variables. - self.keypair_config = dict( + self._keypair_config = dict( (k, config.get('keypairs', k)) for k in KEYPAIR_CONFIG_VARIABLES) @@ -66,25 +70,41 @@ class PGP: expand(config.get('keydirs', k), None, mailman_config.paths)) for k in KEYDIR_CONFIG_PATHS) + def _parse_key_directive(self, value): + key_type, key_length = value.split(':') + key_type = key_type.upper() + key_length = key_length.lower() + + if key_type not in KEYPAIR_TYPE_MAP: + raise ValueError('Invalid key type: {}.'.format(key_type)) + + out_type = KEYPAIR_TYPE_MAP[key_type] + if key_type in ('ECDSA', 'ECDH'): + if key_length not in ECC_OID_MAP: + raise ValueError('Invalid key length: {}.'.format(key_length)) + out_length = ECC_OID_MAP[key_length] + else: + out_length = int(key_length) + return (out_type, out_length) + def _validate_config(self): """ Validate [keypairs] and [keydirs] config sections. And create keydirs if necessary. """ # Validate keypair config. - key_type = self.keypair_config['key_type'].upper() - if key_type not in KEYPAIR_KEY_TYPE_VALID: - raise ValueError('Invalid key_type. {}'.format(key_type)) - self.keypair_config['key_type'] = KEYPAIR_TYPE_MAP[key_type] - self.keypair_config['key_length'] = int( - self.keypair_config['key_length']) + self.primary_key_args = self._parse_key_directive( + self._keypair_config['primary_key']) + if not self.primary_key_args[0].can_sign: + raise ValueError( + 'Invalid primary key type: {}.'.format( + self.primary_key_args[0])) - subkey_type = self.keypair_config['subkey_type'].upper() - if subkey_type not in KEYPAIR_SUBKEY_TYPE_VALID: - raise ValueError('Invalid subkey_type. {}'.format(subkey_type)) - self.keypair_config['subkey_type'] = KEYPAIR_TYPE_MAP[subkey_type] - self.keypair_config['subkey_length'] = int( - self.keypair_config['subkey_length']) + self.sub_key_args = self._parse_key_directive( + self._keypair_config['sub_key']) + if not self.sub_key_args[0].can_encrypt: + raise ValueError( + 'Invalid sub key type: {}.'.format(self.sub_key_args[0])) # Make sure the keydir paths are directories and exist. for keydir in self.keydir_config.values(): diff --git a/src/mailman_pgp/pgp/inline.py b/src/mailman_pgp/pgp/inline.py index 5c23777..d505585 100644 --- a/src/mailman_pgp/pgp/inline.py +++ b/src/mailman_pgp/pgp/inline.py @@ -229,7 +229,7 @@ class InlineWrapper: session_key = cipher.gen_key() for key in keys: emsg = key.encrypt(emsg, cipher=cipher, - session_key=session_key, + sessionkey=session_key, **kwargs) del session_key return emsg diff --git a/src/mailman_pgp/pgp/keygen.py b/src/mailman_pgp/pgp/keygen.py index b750e28..684b81a 100644 --- a/src/mailman_pgp/pgp/keygen.py +++ b/src/mailman_pgp/pgp/keygen.py @@ -29,36 +29,42 @@ from pgpy.constants import ( class ListKeyGenerator(mp.Process): """A multiprocessing list key generator.""" - def __init__(self, keypair_config, display_name, posting_address, + def __init__(self, primary_args, subkey_args, display_name, + posting_address, request_address, key_path): super().__init__( target=self.generate, - args=(keypair_config, display_name, posting_address, + args=(primary_args, subkey_args, display_name, posting_address, request_address, key_path), daemon=True) - def generate(self, keypair_config, display_name, posting_address, + def generate(self, primary_args, subkey_args, display_name, + posting_address, request_address, key_path): """ Generate the list keypair and save it. - :param keypair_config: + :param primary_args: + :param subkey_args: :param display_name: :param posting_address: :param request_address: :param key_path: """ - key = self._create(keypair_config, display_name, posting_address, + key = self._create(primary_args, subkey_args, display_name, + posting_address, request_address) with Lock(key_path + '.lock'): self._save(key, key_path) - def _create(self, config, display_name, posting_address, request_address): + def _create(self, primary_args, subkey_args, display_name, posting_address, + request_address): """ Generate the list `PGPKey` keypair, with posting and request UIDs. Use a Sign+Certify main key and Encrypt subkey. - :param config: + :param primary_args: + :param subkey_args: :param display_name: :param posting_address: :param request_address: @@ -79,9 +85,7 @@ class ListKeyGenerator(mp.Process): ) # Generate the Sign + Certify primary key. - key_type = config['key_type'] - key_length = config['key_length'] - key = PGPKey.new(key_type, key_length) + key = PGPKey.new(*primary_args) key_params = dict(usage={KeyFlags.Sign, KeyFlags.Certify}, **common_params) # Generate the posting + request uids. @@ -89,9 +93,7 @@ class ListKeyGenerator(mp.Process): request_uid = PGPUID.new(display_name, email=request_address) # Generate the Encrypt subkey. - subkey_type = config['subkey_type'] - subkey_length = config['subkey_length'] - subkey = PGPKey.new(subkey_type, subkey_length) + subkey = PGPKey.new(*subkey_args) subkey_params = dict( usage={KeyFlags.EncryptCommunications, KeyFlags.EncryptStorage}, diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py index 3c28132..9978b54 100644 --- a/src/mailman_pgp/pgp/mime.py +++ b/src/mailman_pgp/pgp/mime.py @@ -267,7 +267,7 @@ class MIMEWrapper: session_key = cipher.gen_key() for key in keys: pmsg = key.encrypt(pmsg, cipher=cipher, - session_key=session_key, + sessionkey=session_key, **kwargs) del session_key return pmsg diff --git a/src/mailman_pgp/pgp/tests/data/keys/ecc_curve25519.priv.asc b/src/mailman_pgp/pgp/tests/data/keys/ecc_curve25519.priv.asc new file mode 100644 index 0000000..d66c89e --- /dev/null +++ b/src/mailman_pgp/pgp/tests/data/keys/ecc_curve25519.priv.asc @@ -0,0 +1,15 @@ +-----BEGIN PGP PRIVATE KEY BLOCK----- + +lFgEWW5OExYJKwYBBAHaRw8BAQdALuKQqg42UTUC9PP8Ahk7G3p4mYL0niRdheam +OKNm8wUAAQDYPq5Xlloj4NOhY1yKfpCo6oA2K2VxyY8zD61lDb2+AA9XtDxFQ0Mg +Q3VydmUyNTUxOSAmIEVDQyBDdXJ2ZTI1NTE5IDxFQ0MtQ3VydmUyNTUxOUBleGFt +cGxlLm9yZz6IkAQTFggAOBYhBAWwDrTl6xxIhCHhX/qZ76O408i6BQJZbk4TAhsD +BQsJCAcCBhUICQoLAgQWAgMBAh4BAheAAAoJEPqZ76O408i6nHwA/0suG6HBe2NY +URbNi9b6PQXuhq3sQkBmgAEwt8Yx4HnvAP9qvcN4n3Mv/1vkgSLJzpykffPxo6o4 +2FLQ4jX8yq/EAZxdBFluThMSCisGAQQBl1UBBQEBB0BxjGFvybmP7cfNqMKYCChx +ThAAO+iY8x6MJZ1sGCnxVgMBCAcAAP9OT1RocktFhCvYwTvISq7yd2f2kRxXooZ5 +gt5OJgF54BA2iHgEGBYIACAWIQQFsA605escSIQh4V/6me+juNPIugUCWW5OEwIb +DAAKCRD6me+juNPIumkgAP0YawFWdvFTUk9X0iHzZ0o82qnFU/yf1CpPNZ/00O6w +kAEA+6GQk7kTKq249456imoEY4MiHNz5JP1N/TLPBW/YOAQ= +=+yZA +-----END PGP PRIVATE KEY BLOCK----- diff --git a/src/mailman_pgp/pgp/tests/data/keys/ecc_curve25519.pub.asc b/src/mailman_pgp/pgp/tests/data/keys/ecc_curve25519.pub.asc new file mode 100644 index 0000000..43a5f17 --- /dev/null +++ b/src/mailman_pgp/pgp/tests/data/keys/ecc_curve25519.pub.asc @@ -0,0 +1,14 @@ +-----BEGIN PGP PUBLIC KEY BLOCK----- + +mDMEWW5OExYJKwYBBAHaRw8BAQdALuKQqg42UTUC9PP8Ahk7G3p4mYL0niRdheam +OKNm8wW0PEVDQyBDdXJ2ZTI1NTE5ICYgRUNDIEN1cnZlMjU1MTkgPEVDQy1DdXJ2 +ZTI1NTE5QGV4YW1wbGUub3JnPoiQBBMWCAA4FiEEBbAOtOXrHEiEIeFf+pnvo7jT +yLoFAlluThMCGwMFCwkIBwIGFQgJCgsCBBYCAwECHgECF4AACgkQ+pnvo7jTyLqc +fAD/Sy4bocF7Y1hRFs2L1vo9Be6GrexCQGaAATC3xjHgee8A/2q9w3ifcy//W+SB +IsnOnKR98/GjqjjYUtDiNfzKr8QBuDgEWW5OExIKKwYBBAGXVQEFAQEHQHGMYW/J +uY/tx82owpgIKHFOEAA76JjzHowlnWwYKfFWAwEIB4h4BBgWCAAgFiEEBbAOtOXr +HEiEIeFf+pnvo7jTyLoFAlluThMCGwwACgkQ+pnvo7jTyLppIAD9GGsBVnbxU1JP +V9Ih82dKPNqpxVP8n9QqTzWf9NDusJABAPuhkJO5EyqtuPeOeopqBGODIhzc+ST9 +Tf0yzwVv2DgE +=pyIj +-----END PGP PUBLIC KEY BLOCK----- diff --git a/src/mailman_pgp/pgp/tests/data/keys/ecc_secp256k1.priv.asc b/src/mailman_pgp/pgp/tests/data/keys/ecc_secp256k1.priv.asc new file mode 100644 index 0000000..901a8f4 --- /dev/null +++ b/src/mailman_pgp/pgp/tests/data/keys/ecc_secp256k1.priv.asc @@ -0,0 +1,16 @@ +-----BEGIN PGP PRIVATE KEY BLOCK----- + +lHQEWW5dURMFK4EEAAoCAwT6ABKEPpMMwAQLnHkv+eSycNslonra5Kv7lNQtEeNv +VCXQELzmVGfcKerRVH6kFWpX9c1YxpdrG72Y5mHcRx30AAD/SLMJNXahzpcQy8K1 +XjDqdfCmYpMIFf87xIlZngmLNrkQnLQ5RUNDIHNlY3AyNTZrMSAmIEVDQyBzZWNw +MjU2azEgPEVDQy1zZWNwMjU2azFAZXhhbXBsZS5vcmc+iJAEExMIADgWIQRi7dSu +UMnX5cDo6nvIw1p/AFOkvwUCWW5dUQIbAwULCQgHAgYVCAkKCwIEFgIDAQIeAQIX +gAAKCRDIw1p/AFOkv1kGAP422qrlLxwcZgcZr1k/vJW1s0oYDBlHF2y/I+Q3Kvrs +jQEAjcl/jTBYJTcDBiIRbrzKQMPgj3lEL2R2v/x0d7CdKeeceARZbl1REgUrgQQA +CgIDBOlnr/5pUVGZBipSXZXsYmNaYQzQbG/YvuyJoH5v8n59AS5nQF+XVYVtAT19 +pq3stFGqg6uHrxlJvRHxv1QehVoDAQgHAAD9Elh7gUDS/UYfF5rMUDcjuAigDFVt +MqWIZ+qowNearR8PgYh4BBgTCAAgFiEEYu3UrlDJ1+XA6Op7yMNafwBTpL8FAllu +XVECGwwACgkQyMNafwBTpL9dBQEA1GRFrTIrx3o6+PdKNMSZSKLgsrocI2NGKI5b +XZe62kEA/0eWm6fha6T/wjCQ3befbeG9QA2aITv3XLa/va4ki6p5 +=boGQ +-----END PGP PRIVATE KEY BLOCK----- diff --git a/src/mailman_pgp/pgp/tests/data/keys/ecc_secp256k1.pub.asc b/src/mailman_pgp/pgp/tests/data/keys/ecc_secp256k1.pub.asc new file mode 100644 index 0000000..edd857a --- /dev/null +++ b/src/mailman_pgp/pgp/tests/data/keys/ecc_secp256k1.pub.asc @@ -0,0 +1,15 @@ +-----BEGIN PGP PUBLIC KEY BLOCK----- + +mE8EWW5dURMFK4EEAAoCAwT6ABKEPpMMwAQLnHkv+eSycNslonra5Kv7lNQtEeNv +VCXQELzmVGfcKerRVH6kFWpX9c1YxpdrG72Y5mHcRx30tDlFQ0Mgc2VjcDI1Nmsx +ICYgRUNDIHNlY3AyNTZrMSA8RUNDLXNlY3AyNTZrMUBleGFtcGxlLm9yZz6IkAQT +EwgAOBYhBGLt1K5QydflwOjqe8jDWn8AU6S/BQJZbl1RAhsDBQsJCAcCBhUICQoL +AgQWAgMBAh4BAheAAAoJEMjDWn8AU6S/WQYA/jbaquUvHBxmBxmvWT+8lbWzShgM +GUcXbL8j5Dcq+uyNAQCNyX+NMFglNwMGIhFuvMpAw+CPeUQvZHa//HR3sJ0p57hT +BFluXVESBSuBBAAKAgME6Wev/mlRUZkGKlJdlexiY1phDNBsb9i+7Imgfm/yfn0B +LmdAX5dVhW0BPX2mrey0UaqDq4evGUm9EfG/VB6FWgMBCAeIeAQYEwgAIBYhBGLt +1K5QydflwOjqe8jDWn8AU6S/BQJZbl1RAhsMAAoJEMjDWn8AU6S/XQUBANRkRa0y +K8d6Ovj3SjTEmUii4LK6HCNjRiiOW12XutpBAP9Hlpun4Wuk/8IwkN23n23hvUAN +miE791y2v72uJIuqeQ== +=ryPP +-----END PGP PUBLIC KEY BLOCK----- diff --git a/src/mailman_pgp/pgp/tests/test_keygen.py b/src/mailman_pgp/pgp/tests/test_keygen.py index dab6801..bbd0c84 100644 --- a/src/mailman_pgp/pgp/tests/test_keygen.py +++ b/src/mailman_pgp/pgp/tests/test_keygen.py @@ -15,32 +15,42 @@ # You should have received a copy of the GNU General Public License along with # this program. If not, see <http://www.gnu.org/licenses/>. +"""Test the out-of-process key generator.""" from os.path import exists, isfile, join from tempfile import TemporaryDirectory from unittest import TestCase +from parameterized import parameterized from pgpy import PGPKey -from pgpy.constants import PubKeyAlgorithm +from pgpy.constants import PubKeyAlgorithm, EllipticCurveOID from mailman_pgp.pgp.keygen import ListKeyGenerator -class TesKeygen(TestCase): +class TestKeygen(TestCase): def setUp(self): - self.keypair_config = { - 'key_type': PubKeyAlgorithm.RSAEncryptOrSign, - 'key_length': 1024, - 'subkey_type': PubKeyAlgorithm.RSAEncryptOrSign, - 'subkey_length': 1024 - } self.display_name = 'Display Name' self.posting_address = 'posting@address.com' self.request_address = 'posting-request@address.com' - def test_generate(self): + @parameterized.expand([ + # RSA + RSA + (PubKeyAlgorithm.RSAEncryptOrSign, 1024, + PubKeyAlgorithm.RSAEncryptOrSign, 1024), + # ECDSA + ECDH + (PubKeyAlgorithm.ECDSA, EllipticCurveOID.SECP256K1, + PubKeyAlgorithm.ECDH, EllipticCurveOID.SECP256K1), + # DSA + ECDH + (PubKeyAlgorithm.DSA, 1024, + PubKeyAlgorithm.ECDH, EllipticCurveOID.SECP256K1) + ]) + def test_generate(self, primary_key_type, primary_key_size, sub_key_type, + sub_key_size): with TemporaryDirectory() as temp_dir: key_path = join(temp_dir, 'key.asc') - keygen = ListKeyGenerator(self.keypair_config, self.display_name, + keygen = ListKeyGenerator((primary_key_type, primary_key_size), + (sub_key_type, sub_key_size), + self.display_name, self.posting_address, self.request_address, key_path) keygen.start() @@ -50,18 +60,18 @@ class TesKeygen(TestCase): key, _ = PGPKey.from_file(key_path) self.assertEqual(key.key_algorithm, - self.keypair_config['key_type']) + primary_key_type) self.assertEqual(key.key_size, - self.keypair_config['key_length']) + primary_key_size) subs = key.subkeys self.assertEqual(len(subs), 1) keyid, sub = subs.popitem() self.assertEqual(sub.key_algorithm, - self.keypair_config['subkey_type']) + sub_key_type) self.assertEqual(sub.key_size, - self.keypair_config['subkey_length']) + sub_key_size) uids = key.userids self.assertEqual(len(uids), 2) diff --git a/src/mailman_pgp/rest/tests/test_lists.py b/src/mailman_pgp/rest/tests/test_lists.py index c5c9854..2ebac6b 100644 --- a/src/mailman_pgp/rest/tests/test_lists.py +++ b/src/mailman_pgp/rest/tests/test_lists.py @@ -21,8 +21,9 @@ from mailman.app.lifecycle import create_list from mailman.testing.helpers import call_api from pgpy import PGPKey -from mailman_pgp.database import mm_transaction +from mailman_pgp.database import mm_transaction, transaction from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.tests.base import load_key from mailman_pgp.testing.layers import PGPRESTLayer @@ -63,8 +64,9 @@ class TestLists(TestCase): with mm_transaction(): mlist = create_list('another@example.com', style_name='pgp-default') + with transaction(): pgp_list = PGPMailingList.for_list(mlist) - pgp_list.generate_key(True) + pgp_list.key = load_key('ecc_p256.priv.asc') json, response = call_api( 'http://localhost:9001/3.1/plugins/pgp/lists/' diff --git a/src/mailman_pgp/testing/layers.py b/src/mailman_pgp/testing/layers.py index fb8a3ec..07eb386 100644 --- a/src/mailman_pgp/testing/layers.py +++ b/src/mailman_pgp/testing/layers.py @@ -19,6 +19,7 @@ import os from os.path import isfile from mailman.testing.layers import (ConfigLayer, RESTLayer, SMTPLayer) +from sqlalchemy.exc import SQLAlchemyError from mailman_pgp.config import config from mailman_pgp.database import transaction @@ -51,7 +52,10 @@ def reset_pgp_soft(): with contextlib.closing(config.db.engine.connect()) as con: trans = con.begin() for table in reversed(Base.metadata.sorted_tables): - con.execute(table.delete()) + try: + con.execute(table.delete()) + except SQLAlchemyError: + pass trans.commit() @@ -60,10 +64,6 @@ def reset_pgp_soft(): # and subclass both it and the respective Mailman Core test layer. class PGPConfigLayer(ConfigLayer): @classmethod - def setUp(cls): - config.set('keypairs', 'autogenerate', 'no') - - @classmethod def tearDown(cls): reset_pgp_soft() diff --git a/src/mailman_pgp/testing/mailman.cfg b/src/mailman_pgp/testing/mailman.cfg new file mode 100644 index 0000000..d9401ca --- /dev/null +++ b/src/mailman_pgp/testing/mailman.cfg @@ -0,0 +1,33 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +# Testing Mailman config + +[plugin.pgp] +class: mailman_pgp.plugin.PGPMailman +path: mailman_pgp +enable: yes +configuration: python:mailman_pgp.testing.mailman_pgp + +[mta] +outgoing: mailman_pgp.mta.deliver.deliver + +[runner.in] +class: mailman_pgp.runners.incoming.PGPIncomingRunner + +[runner.in_default] +class: mailman.runners.incoming.IncomingRunner diff --git a/src/mailman_pgp/testing/mailman_pgp.cfg b/src/mailman_pgp/testing/mailman_pgp.cfg new file mode 100644 index 0000000..871f429 --- /dev/null +++ b/src/mailman_pgp/testing/mailman_pgp.cfg @@ -0,0 +1,61 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +# Testing PGP config + +[db] +# db path the PGP plugin will use to store list/user configuration (not keys!). +url = sqlite:////$DATA_DIR/pgp.db + + +[keydirs] +# Key directory used to store user public keys. +user_keydir = $DATA_DIR/pgp/user_keydir/ + +# Key directory used to store list keypairs. +list_keydir = $DATA_DIR/pgp/list_keydir/ + +# Key directory used to store archive public keys. +archive_keydir = $DATA_DIR/pgp/archive_keydir/ + + +[keypairs] +# Whether to autogenerate the list key on list creation. +autogenerate = no + +# Type of primary list key and its size. +# Format: type:size +# type is one of: +# RSA, DSA, ECDSA. +# size is the key size or curve name for ECDSA, which can be one of: +# nistp256, nistp384, nistp521, brainpoolP256r1, brainpoolP384r1, +# brainpoolP512r1, secp256k1 +primary_key = ECDSA:secp256k1 + +# Type of list encryption subkey and its size. +# Format: type:size +# type is one of: +# RSA, ECDH +# size is the key size or curve name for ECDH, which can be one of: +# nistp256, nistp384, nistp521, brainpoolP256r1, brainpoolP384r1, +# brainpoolP512r1, secp256k1 +sub_key = ECDH:secp256k1 + + +[queues] +# The queue to which processed incoming messages are passed. +in = in_default
\ No newline at end of file diff --git a/src/mailman_pgp/utils/email.py b/src/mailman_pgp/utils/email.py index d1efc40..a936458 100644 --- a/src/mailman_pgp/utils/email.py +++ b/src/mailman_pgp/utils/email.py @@ -50,7 +50,8 @@ def overwrite_message(from_msg, to_msg): :param to_msg: The destination `Message`. :type to_msg: email.message.Message """ - del to_msg[:] + for key in to_msg.keys(): + del to_msg[key] for key, value in from_msg.items(): to_msg[key] = value to_msg.set_unixfrom(from_msg.get_unixfrom()) |
