aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mailman_pgp/model/address.py12
-rw-r--r--src/mailman_pgp/pgp/keygen.py39
-rw-r--r--src/mailman_pgp/rules/tests/test_encryption.py50
-rw-r--r--src/mailman_pgp/rules/tests/test_signature.py21
-rw-r--r--src/mailman_pgp/runners/tests/__init__.py0
-rw-r--r--src/mailman_pgp/runners/tests/test_incoming.py126
6 files changed, 219 insertions, 29 deletions
diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py
index a6aaf3c..1d07486 100644
--- a/src/mailman_pgp/model/address.py
+++ b/src/mailman_pgp/model/address.py
@@ -76,10 +76,14 @@ class PGPAddress(Base):
os.remove(self.key_path)
except FileNotFoundError:
pass
- self.key_fingerprint = str(new_key.fingerprint)
- with open(self.key_path, 'w') as out:
- out.write(str(new_key))
- self._key = new_key
+ if new_key is None:
+ self.key_fingerprint = None
+ self._key = None
+ else:
+ self.key_fingerprint = str(new_key.fingerprint)
+ with open(self.key_path, 'w') as out:
+ out.write(str(new_key))
+ self._key = new_key
@property
def key_path(self):
diff --git a/src/mailman_pgp/pgp/keygen.py b/src/mailman_pgp/pgp/keygen.py
index 33784dd..55bcacf 100644
--- a/src/mailman_pgp/pgp/keygen.py
+++ b/src/mailman_pgp/pgp/keygen.py
@@ -33,10 +33,10 @@ class ListKeyGenerator(mp.Process):
def __init__(self, keypair_config, display_name, posting_address,
request_address, key_path):
super().__init__(
- target=self.generate,
- args=(keypair_config, display_name, posting_address,
- request_address, key_path),
- daemon=True)
+ target=self.generate,
+ args=(keypair_config, display_name, posting_address,
+ request_address, key_path),
+ daemon=True)
def generate(self, keypair_config, display_name, posting_address,
request_address, key_path):
@@ -67,22 +67,26 @@ class ListKeyGenerator(mp.Process):
:param request_address:
:return: `PGPKey`
"""
+ common_params = dict(
+ hashes=[HashAlgorithm.SHA256,
+ HashAlgorithm.SHA384,
+ HashAlgorithm.SHA512,
+ HashAlgorithm.SHA224],
+ ciphers=[SymmetricKeyAlgorithm.AES256,
+ SymmetricKeyAlgorithm.AES192,
+ SymmetricKeyAlgorithm.AES128],
+ compression=[CompressionAlgorithm.ZLIB,
+ CompressionAlgorithm.BZ2,
+ CompressionAlgorithm.ZIP,
+ CompressionAlgorithm.Uncompressed]
+ )
+
# Generate the Sign + Certify primary key.
key_type = config['key_type']
key_length = config['key_length']
key = PGPKey.new(key_type, key_length)
key_params = dict(usage={KeyFlags.Sign, KeyFlags.Certify},
- hashes=[HashAlgorithm.SHA256,
- HashAlgorithm.SHA384,
- HashAlgorithm.SHA512,
- HashAlgorithm.SHA224],
- ciphers=[SymmetricKeyAlgorithm.AES256,
- SymmetricKeyAlgorithm.AES192,
- SymmetricKeyAlgorithm.AES128],
- compression=[CompressionAlgorithm.ZLIB,
- CompressionAlgorithm.BZ2,
- CompressionAlgorithm.ZIP,
- CompressionAlgorithm.Uncompressed])
+ **common_params)
# Generate the posting + request uids.
main_uid = PGPUID.new(display_name, email=posting_address)
request_uid = PGPUID.new(display_name,
@@ -92,8 +96,9 @@ class ListKeyGenerator(mp.Process):
subkey_length = config['subkey_length']
subkey = PGPKey.new(subkey_type, subkey_length)
subkey_params = dict(
- usage={KeyFlags.EncryptCommunications, KeyFlags.EncryptStorage},
- primary=False
+ usage={KeyFlags.EncryptCommunications,
+ KeyFlags.EncryptStorage},
+ **common_params
)
# Put it all together.
key.add_uid(main_uid, primary=True, **key_params)
diff --git a/src/mailman_pgp/rules/tests/test_encryption.py b/src/mailman_pgp/rules/tests/test_encryption.py
new file mode 100644
index 0000000..276a9f3
--- /dev/null
+++ b/src/mailman_pgp/rules/tests/test_encryption.py
@@ -0,0 +1,50 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+from unittest import TestCase
+
+from mailman.app.lifecycle import create_list
+from mailman.testing.helpers import specialized_message_from_string as mfs
+
+from mailman_pgp.config import mm_config
+from mailman_pgp.database import mm_transaction
+from mailman_pgp.rules.encryption import Encryption
+from mailman_pgp.testing.layers import PGPConfigLayer
+
+
+class TestEncryptionRule(TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ self.rule = Encryption()
+ with mm_transaction():
+ self.mlist = create_list('test@example.com',
+ style_name='pgp-default')
+
+ def test_has_rule(self):
+ self.assertIn(Encryption.name, mm_config.rules.keys())
+
+ def test_matches(self):
+ msgdata = {'pgp_moderate': True}
+ msg = mfs("""\
+From: anne@example.com
+To: test@example.com
+
+""")
+ self.assertTrue(self.rule.check(self.mlist, msg, msgdata))
+ msgdata['pgp_moderate'] = False
+ self.assertFalse(self.rule.check(self.mlist, msg, msgdata))
+ self.assertFalse(self.rule.check(self.mlist, msg, {}))
diff --git a/src/mailman_pgp/rules/tests/test_signature.py b/src/mailman_pgp/rules/tests/test_signature.py
index a7eec5a..2f5881a 100644
--- a/src/mailman_pgp/rules/tests/test_signature.py
+++ b/src/mailman_pgp/rules/tests/test_signature.py
@@ -17,7 +17,6 @@
from unittest import TestCase
from mailman.app.lifecycle import create_list
-from mailman.config import config
from mailman.interfaces.action import Action
from mailman.interfaces.member import MemberRole
from mailman.interfaces.usermanager import IUserManager
@@ -25,6 +24,7 @@ from mailman.testing.helpers import (set_preferred,
specialized_message_from_string as mfs)
from zope.component import getUtility
+from mailman_pgp.config import mm_config
from mailman_pgp.database import mm_transaction, transaction
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
@@ -33,7 +33,7 @@ from mailman_pgp.rules.signature import Signature
from mailman_pgp.testing.layers import PGPConfigLayer
-class TestSignature(TestCase):
+class TestSignatureRule(TestCase):
layer = PGPConfigLayer
def setUp(self):
@@ -41,7 +41,7 @@ class TestSignature(TestCase):
user_manager = getUtility(IUserManager)
with mm_transaction():
- self.mlist = create_list('nobody@example.com',
+ self.mlist = create_list('test@example.com',
style_name='pgp-default')
self.sender = user_manager.create_user('RSA-1024b@example.org')
set_preferred(self.sender)
@@ -64,14 +64,14 @@ class TestSignature(TestCase):
'data/mime_signed_invalid.eml')
def test_has_rule(self):
- self.assertIn(Signature.name, config.rules.keys())
+ self.assertIn(Signature.name, mm_config.rules.keys())
def test_no_pgp_list(self):
with mm_transaction():
- ordinary_list = create_list('test@example.com')
+ ordinary_list = create_list('odrinary@example.com')
msg = mfs("""\
From: anne@example.com
-To: test@example.com
+To: ordinary@example.com
""")
@@ -83,14 +83,19 @@ To: test@example.com
self.pgp_list.unsigned_msg_action = Action.defer
msg = mfs("""\
From: anne@example.com
-To: nobody@example.com
+To: test@example.com
""")
with self.assertRaises(ValueError):
self.rule.check(self.mlist, msg, {})
def test_no_key(self):
- pass
+ with transaction():
+ self.pgp_sender.key = None
+
+ msgdata = {}
+ with self.assertRaises(ValueError):
+ self.rule.check(self.mlist, self.msg_mime_signed, msgdata)
def assertAction(self, msgdata, action, reasons):
self.assertEqual(msgdata['moderation_action'], action.name)
diff --git a/src/mailman_pgp/runners/tests/__init__.py b/src/mailman_pgp/runners/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/mailman_pgp/runners/tests/__init__.py
diff --git a/src/mailman_pgp/runners/tests/test_incoming.py b/src/mailman_pgp/runners/tests/test_incoming.py
new file mode 100644
index 0000000..d51dbb8
--- /dev/null
+++ b/src/mailman_pgp/runners/tests/test_incoming.py
@@ -0,0 +1,126 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+from time import sleep
+from unittest import TestCase
+
+from mailman.app.lifecycle import create_list
+from mailman.interfaces.action import Action
+from mailman.interfaces.member import MemberRole
+from mailman.interfaces.usermanager import IUserManager
+from mailman.testing.helpers import (make_testable_runner,
+ specialized_message_from_string as mfs,
+ get_queue_messages, set_preferred)
+from pgpy import PGPMessage
+from zope.component import getUtility
+
+from mailman_pgp.config import mm_config
+from mailman_pgp.database import mm_transaction, transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.tests.base import load_key, load_message
+from mailman_pgp.runners.incoming import IncomingRunner
+from mailman_pgp.testing.layers import PGPConfigLayer
+
+
+class TestIncoming(TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ user_manager = getUtility(IUserManager)
+ with mm_transaction():
+ self.mlist = create_list('test@example.com',
+ style_name='pgp-default')
+ self.sender = user_manager.create_user('RSA-1024b@example.org')
+ set_preferred(self.sender)
+ self.mlist.subscribe(self.sender, MemberRole.member)
+
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+
+ sender_key = load_key('data/rsa_1024.pub.asc')
+ with transaction() as t:
+ self.pgp_sender = PGPAddress(self.sender.preferred_address)
+ self.pgp_sender.key = sender_key
+ t.add(self.pgp_sender)
+
+ self.msg_clear = load_message('data/clear.eml')
+ self.msg_inline_encrypted = load_message('data/inline_encrypted.eml')
+
+ self.runner = make_testable_runner(IncomingRunner, 'in')
+
+ def test_pass_default(self):
+ with mm_transaction():
+ ordinary_list = create_list('ordinary@example.com')
+
+ msg = mfs("""\
+From: anne@example.com
+To: ordinary@example.com
+
+""")
+
+ msgdata = dict(listid='ordinary.example.com')
+ mm_config.switchboards['in'].enqueue(msg, msgdata)
+ self.runner.run()
+ items = get_queue_messages('in_default', expected_count=1)
+ self.assertEqual(items[0].msg.sender, 'anne@example.com')
+
+ def test_nonencrypted_action(self):
+ with transaction():
+ self.pgp_list.nonencrypted_msg_action = Action.hold
+
+ msgdata = dict(listid='test.example.com')
+ mm_config.switchboards['in'].enqueue(self.msg_clear, msgdata)
+ self.runner.run()
+ items = get_queue_messages('in_default', expected_count=1)
+ self.assertEqual(items[0].msgdata['moderation_action'],
+ Action.hold.name)
+ self.assertEqual(items[0].msgdata['moderation_sender'],
+ self.msg_clear.sender)
+ self.assertEqual(items[0].msgdata['moderation_reason'],
+ 'Message was not encrypted.')
+ self.assertTrue(items[0].msgdata['pgp_moderate'])
+
+ with transaction():
+ self.pgp_list.nonencrypted_msg_action = Action.defer
+
+ msgdata = dict(listid='test.example.com')
+ mm_config.switchboards['in'].enqueue(self.msg_clear, msgdata)
+ self.runner.run()
+ get_queue_messages('in_default', expected_count=1)
+
+ def test_decrypt(self):
+ for i in range(15): # pragma: no cover
+ if self.pgp_list.pubkey is not None:
+ break
+ sleep(1)
+
+ payload = 'Some encrypted text.'
+ pmsg = PGPMessage.new(payload)
+ emsg = self.pgp_list.pubkey.encrypt(pmsg)
+ msg = mfs("""
+From: RSA-1024b@example.org
+To: test@example.com
+
+{}
+""".format(str(emsg)))
+
+ msgdata = dict(listid='test.example.com')
+ mm_config.switchboards['in'].enqueue(msg,
+ msgdata)
+ self.runner.run()
+ items = get_queue_messages('in_default', expected_count=1)
+ out_msg = items[0].msg
+ self.assertEqual(out_msg.get_payload(), payload)