diff options
| author | J08nY | 2017-06-29 00:53:23 +0200 |
|---|---|---|
| committer | J08nY | 2017-06-29 00:53:23 +0200 |
| commit | 471b222e1c6c4c850c84ec8e268ec4e003f02360 (patch) | |
| tree | ffda7354aebf45af7764cc0208db6939d8ca5474 | |
| parent | 912e64d36e92ab19a4c05abeb943afa34da2186b (diff) | |
| download | mailman-pgp-471b222e1c6c4c850c84ec8e268ec4e003f02360.tar.gz mailman-pgp-471b222e1c6c4c850c84ec8e268ec4e003f02360.tar.zst mailman-pgp-471b222e1c6c4c850c84ec8e268ec4e003f02360.zip | |
| -rw-r--r-- | src/mailman_pgp/model/address.py | 12 | ||||
| -rw-r--r-- | src/mailman_pgp/pgp/keygen.py | 39 | ||||
| -rw-r--r-- | src/mailman_pgp/rules/tests/test_encryption.py | 50 | ||||
| -rw-r--r-- | src/mailman_pgp/rules/tests/test_signature.py | 21 | ||||
| -rw-r--r-- | src/mailman_pgp/runners/tests/__init__.py | 0 | ||||
| -rw-r--r-- | src/mailman_pgp/runners/tests/test_incoming.py | 126 |
6 files changed, 219 insertions, 29 deletions
diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py index a6aaf3c..1d07486 100644 --- a/src/mailman_pgp/model/address.py +++ b/src/mailman_pgp/model/address.py @@ -76,10 +76,14 @@ class PGPAddress(Base): os.remove(self.key_path) except FileNotFoundError: pass - self.key_fingerprint = str(new_key.fingerprint) - with open(self.key_path, 'w') as out: - out.write(str(new_key)) - self._key = new_key + if new_key is None: + self.key_fingerprint = None + self._key = None + else: + self.key_fingerprint = str(new_key.fingerprint) + with open(self.key_path, 'w') as out: + out.write(str(new_key)) + self._key = new_key @property def key_path(self): diff --git a/src/mailman_pgp/pgp/keygen.py b/src/mailman_pgp/pgp/keygen.py index 33784dd..55bcacf 100644 --- a/src/mailman_pgp/pgp/keygen.py +++ b/src/mailman_pgp/pgp/keygen.py @@ -33,10 +33,10 @@ class ListKeyGenerator(mp.Process): def __init__(self, keypair_config, display_name, posting_address, request_address, key_path): super().__init__( - target=self.generate, - args=(keypair_config, display_name, posting_address, - request_address, key_path), - daemon=True) + target=self.generate, + args=(keypair_config, display_name, posting_address, + request_address, key_path), + daemon=True) def generate(self, keypair_config, display_name, posting_address, request_address, key_path): @@ -67,22 +67,26 @@ class ListKeyGenerator(mp.Process): :param request_address: :return: `PGPKey` """ + common_params = dict( + hashes=[HashAlgorithm.SHA256, + HashAlgorithm.SHA384, + HashAlgorithm.SHA512, + HashAlgorithm.SHA224], + ciphers=[SymmetricKeyAlgorithm.AES256, + SymmetricKeyAlgorithm.AES192, + SymmetricKeyAlgorithm.AES128], + compression=[CompressionAlgorithm.ZLIB, + CompressionAlgorithm.BZ2, + CompressionAlgorithm.ZIP, + CompressionAlgorithm.Uncompressed] + ) + # Generate the Sign + Certify primary key. key_type = config['key_type'] key_length = config['key_length'] key = PGPKey.new(key_type, key_length) key_params = dict(usage={KeyFlags.Sign, KeyFlags.Certify}, - hashes=[HashAlgorithm.SHA256, - HashAlgorithm.SHA384, - HashAlgorithm.SHA512, - HashAlgorithm.SHA224], - ciphers=[SymmetricKeyAlgorithm.AES256, - SymmetricKeyAlgorithm.AES192, - SymmetricKeyAlgorithm.AES128], - compression=[CompressionAlgorithm.ZLIB, - CompressionAlgorithm.BZ2, - CompressionAlgorithm.ZIP, - CompressionAlgorithm.Uncompressed]) + **common_params) # Generate the posting + request uids. main_uid = PGPUID.new(display_name, email=posting_address) request_uid = PGPUID.new(display_name, @@ -92,8 +96,9 @@ class ListKeyGenerator(mp.Process): subkey_length = config['subkey_length'] subkey = PGPKey.new(subkey_type, subkey_length) subkey_params = dict( - usage={KeyFlags.EncryptCommunications, KeyFlags.EncryptStorage}, - primary=False + usage={KeyFlags.EncryptCommunications, + KeyFlags.EncryptStorage}, + **common_params ) # Put it all together. key.add_uid(main_uid, primary=True, **key_params) diff --git a/src/mailman_pgp/rules/tests/test_encryption.py b/src/mailman_pgp/rules/tests/test_encryption.py new file mode 100644 index 0000000..276a9f3 --- /dev/null +++ b/src/mailman_pgp/rules/tests/test_encryption.py @@ -0,0 +1,50 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. +from unittest import TestCase + +from mailman.app.lifecycle import create_list +from mailman.testing.helpers import specialized_message_from_string as mfs + +from mailman_pgp.config import mm_config +from mailman_pgp.database import mm_transaction +from mailman_pgp.rules.encryption import Encryption +from mailman_pgp.testing.layers import PGPConfigLayer + + +class TestEncryptionRule(TestCase): + layer = PGPConfigLayer + + def setUp(self): + self.rule = Encryption() + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + + def test_has_rule(self): + self.assertIn(Encryption.name, mm_config.rules.keys()) + + def test_matches(self): + msgdata = {'pgp_moderate': True} + msg = mfs("""\ +From: anne@example.com +To: test@example.com + +""") + self.assertTrue(self.rule.check(self.mlist, msg, msgdata)) + msgdata['pgp_moderate'] = False + self.assertFalse(self.rule.check(self.mlist, msg, msgdata)) + self.assertFalse(self.rule.check(self.mlist, msg, {})) diff --git a/src/mailman_pgp/rules/tests/test_signature.py b/src/mailman_pgp/rules/tests/test_signature.py index a7eec5a..2f5881a 100644 --- a/src/mailman_pgp/rules/tests/test_signature.py +++ b/src/mailman_pgp/rules/tests/test_signature.py @@ -17,7 +17,6 @@ from unittest import TestCase from mailman.app.lifecycle import create_list -from mailman.config import config from mailman.interfaces.action import Action from mailman.interfaces.member import MemberRole from mailman.interfaces.usermanager import IUserManager @@ -25,6 +24,7 @@ from mailman.testing.helpers import (set_preferred, specialized_message_from_string as mfs) from zope.component import getUtility +from mailman_pgp.config import mm_config from mailman_pgp.database import mm_transaction, transaction from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList @@ -33,7 +33,7 @@ from mailman_pgp.rules.signature import Signature from mailman_pgp.testing.layers import PGPConfigLayer -class TestSignature(TestCase): +class TestSignatureRule(TestCase): layer = PGPConfigLayer def setUp(self): @@ -41,7 +41,7 @@ class TestSignature(TestCase): user_manager = getUtility(IUserManager) with mm_transaction(): - self.mlist = create_list('nobody@example.com', + self.mlist = create_list('test@example.com', style_name='pgp-default') self.sender = user_manager.create_user('RSA-1024b@example.org') set_preferred(self.sender) @@ -64,14 +64,14 @@ class TestSignature(TestCase): 'data/mime_signed_invalid.eml') def test_has_rule(self): - self.assertIn(Signature.name, config.rules.keys()) + self.assertIn(Signature.name, mm_config.rules.keys()) def test_no_pgp_list(self): with mm_transaction(): - ordinary_list = create_list('test@example.com') + ordinary_list = create_list('odrinary@example.com') msg = mfs("""\ From: anne@example.com -To: test@example.com +To: ordinary@example.com """) @@ -83,14 +83,19 @@ To: test@example.com self.pgp_list.unsigned_msg_action = Action.defer msg = mfs("""\ From: anne@example.com -To: nobody@example.com +To: test@example.com """) with self.assertRaises(ValueError): self.rule.check(self.mlist, msg, {}) def test_no_key(self): - pass + with transaction(): + self.pgp_sender.key = None + + msgdata = {} + with self.assertRaises(ValueError): + self.rule.check(self.mlist, self.msg_mime_signed, msgdata) def assertAction(self, msgdata, action, reasons): self.assertEqual(msgdata['moderation_action'], action.name) diff --git a/src/mailman_pgp/runners/tests/__init__.py b/src/mailman_pgp/runners/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/mailman_pgp/runners/tests/__init__.py diff --git a/src/mailman_pgp/runners/tests/test_incoming.py b/src/mailman_pgp/runners/tests/test_incoming.py new file mode 100644 index 0000000..d51dbb8 --- /dev/null +++ b/src/mailman_pgp/runners/tests/test_incoming.py @@ -0,0 +1,126 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. +from time import sleep +from unittest import TestCase + +from mailman.app.lifecycle import create_list +from mailman.interfaces.action import Action +from mailman.interfaces.member import MemberRole +from mailman.interfaces.usermanager import IUserManager +from mailman.testing.helpers import (make_testable_runner, + specialized_message_from_string as mfs, + get_queue_messages, set_preferred) +from pgpy import PGPMessage +from zope.component import getUtility + +from mailman_pgp.config import mm_config +from mailman_pgp.database import mm_transaction, transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.tests.base import load_key, load_message +from mailman_pgp.runners.incoming import IncomingRunner +from mailman_pgp.testing.layers import PGPConfigLayer + + +class TestIncoming(TestCase): + layer = PGPConfigLayer + + def setUp(self): + user_manager = getUtility(IUserManager) + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + self.sender = user_manager.create_user('RSA-1024b@example.org') + set_preferred(self.sender) + self.mlist.subscribe(self.sender, MemberRole.member) + + self.pgp_list = PGPMailingList.for_list(self.mlist) + + sender_key = load_key('data/rsa_1024.pub.asc') + with transaction() as t: + self.pgp_sender = PGPAddress(self.sender.preferred_address) + self.pgp_sender.key = sender_key + t.add(self.pgp_sender) + + self.msg_clear = load_message('data/clear.eml') + self.msg_inline_encrypted = load_message('data/inline_encrypted.eml') + + self.runner = make_testable_runner(IncomingRunner, 'in') + + def test_pass_default(self): + with mm_transaction(): + ordinary_list = create_list('ordinary@example.com') + + msg = mfs("""\ +From: anne@example.com +To: ordinary@example.com + +""") + + msgdata = dict(listid='ordinary.example.com') + mm_config.switchboards['in'].enqueue(msg, msgdata) + self.runner.run() + items = get_queue_messages('in_default', expected_count=1) + self.assertEqual(items[0].msg.sender, 'anne@example.com') + + def test_nonencrypted_action(self): + with transaction(): + self.pgp_list.nonencrypted_msg_action = Action.hold + + msgdata = dict(listid='test.example.com') + mm_config.switchboards['in'].enqueue(self.msg_clear, msgdata) + self.runner.run() + items = get_queue_messages('in_default', expected_count=1) + self.assertEqual(items[0].msgdata['moderation_action'], + Action.hold.name) + self.assertEqual(items[0].msgdata['moderation_sender'], + self.msg_clear.sender) + self.assertEqual(items[0].msgdata['moderation_reason'], + 'Message was not encrypted.') + self.assertTrue(items[0].msgdata['pgp_moderate']) + + with transaction(): + self.pgp_list.nonencrypted_msg_action = Action.defer + + msgdata = dict(listid='test.example.com') + mm_config.switchboards['in'].enqueue(self.msg_clear, msgdata) + self.runner.run() + get_queue_messages('in_default', expected_count=1) + + def test_decrypt(self): + for i in range(15): # pragma: no cover + if self.pgp_list.pubkey is not None: + break + sleep(1) + + payload = 'Some encrypted text.' + pmsg = PGPMessage.new(payload) + emsg = self.pgp_list.pubkey.encrypt(pmsg) + msg = mfs(""" +From: RSA-1024b@example.org +To: test@example.com + +{} +""".format(str(emsg))) + + msgdata = dict(listid='test.example.com') + mm_config.switchboards['in'].enqueue(msg, + msgdata) + self.runner.run() + items = get_queue_messages('in_default', expected_count=1) + out_msg = items[0].msg + self.assertEqual(out_msg.get_payload(), payload) |
