aboutsummaryrefslogtreecommitdiff
path: root/src/mailman_pgp/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman_pgp/commands')
-rw-r--r--src/mailman_pgp/commands/eml_key.py97
-rw-r--r--src/mailman_pgp/commands/tests/test_key.py293
2 files changed, 388 insertions, 2 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index 46303c3..a298ea7 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -22,6 +22,7 @@ from email.mime.text import MIMEText
from mailman.email.message import UserNotification
from mailman.interfaces.command import ContinueProcessing, IEmailCommand
+from mailman.interfaces.member import MemberRole
from mailman.interfaces.pending import IPendings
from mailman.interfaces.subscriptions import ISubscriptionManager
from mailman.interfaces.usermanager import IUserManager
@@ -351,7 +352,101 @@ def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results):
def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results):
# List public key attached, signed by the users current key.
- pass
+ if len(arguments) != 1:
+ print('Extraneous argument/s: ' + ','.join(arguments[1:]),
+ file=results)
+ return ContinueProcessing.no
+
+ email = get_email(msg)
+ if not email:
+ print('No email.', file=results)
+ return ContinueProcessing.no
+
+ pgp_address = PGPAddress.for_email(email)
+ if pgp_address is None:
+ print('A pgp enabled address not found.', file=results)
+ return ContinueProcessing.no
+
+ key = pgp_address.key
+ if key is None:
+ print("You currently don't have a key set.", file=results)
+ return ContinueProcessing.no
+
+ if not pgp_address.key_confirmed:
+ print('Your key is currently not confirmed.', file=results)
+ return ContinueProcessing.no
+
+ wrapped = PGPWrapper(msg)
+ if wrapped.is_encrypted():
+ decrypted = wrapped.try_decrypt(pgp_list.key)
+ wrapped = PGPWrapper(decrypted)
+
+ if not wrapped.has_keys():
+ print('No keys attached? Send a key.', file=results)
+ return ContinueProcessing.no
+
+ keys = list(wrapped.keys())
+ if len(keys) != 1:
+ print('More than one key! Send only one key.', file=results)
+ return ContinueProcessing.no
+ key = keys.pop()
+
+ allowed_signers = pgp_list.key_signing_allowed
+ roster_map = {
+ MemberRole.member: mlist.members,
+ MemberRole.owner: mlist.owners,
+ MemberRole.moderator: mlist.moderators,
+ MemberRole.nonmember: mlist.nonmembers
+ }
+ allowed = False
+ for allowed_role in allowed_signers:
+ allowed_roster = roster_map[allowed_role]
+ if allowed_roster.get_member(email) is not None:
+ allowed = True
+ break
+
+ if not allowed:
+ print('You are not allowed to sign the list key.', file=results)
+ return ContinueProcessing.no
+
+ if pgp_list.pubkey.key_material != key.key_material:
+ print('You sent a wrong key.', file=results)
+ return ContinueProcessing.no
+
+ uid_map = {}
+ for uid in pgp_list.key.userids:
+ for uid_other in key.userids:
+ if uid == uid_other:
+ uid_map[uid] = uid_other
+
+ if len(uid_map) == 0:
+ print('No signed UIDs found.', file=results)
+ return ContinueProcessing.no
+
+ uid_sigs = {}
+ for uid, uid_other in uid_map.items():
+ for sig in uid_other.signatures:
+ if sig in uid.signatures:
+ continue
+ if sig.signer != pgp_address.key.fingerprint.keyid:
+ continue
+ # sig is a new signature, not currenctly on uid, ans seems to
+ # be made by the pgp_address.key
+ verification = pgp_address.key.verify(uid, sig)
+ if bool(verification):
+ uid_sigs.setdefault(uid, []).append(sig)
+
+ if len(uid_sigs) == 0:
+ print('No new certifications found.', file=results)
+ return ContinueProcessing.no
+
+ for uid, sigs in uid_sigs.items():
+ for sig in sigs:
+ uid |= sig
+ pgp_list.fs_key.save()
+
+ print('List key updated with new signatures.', file=results)
+ return ContinueProcessing.yes
def _cmd_receive(pgp_list, mlist, msg, msgdata, arguments, results):
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
index 266def6..5a6bb12 100644
--- a/src/mailman_pgp/commands/tests/test_key.py
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -16,11 +16,12 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
""""""
-
+import copy
import unittest
from mailman.app.lifecycle import create_list
from mailman.email.message import Message
+from mailman.interfaces.member import MemberRole
from mailman.interfaces.subscriptions import ISubscriptionManager
from mailman.interfaces.usermanager import IUserManager
from mailman.runners.command import CommandRunner
@@ -942,6 +943,296 @@ class TestAfterSubscription(unittest.TestCase):
self.assertEqual(len(revocs), 1)
self.assertEqual(revoc.hash2, revocs[0].hash2)
+ def test_sign(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ self.mlist.subscribe(bart)
+ get_queue_messages('virgin')
+
+ self.pgp_list.key_signing_allowed = {MemberRole.member}
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key sign')
+ wrapped_message = MIMEWrapper(message)
+ new_key = copy.copy(self.pgp_list.pubkey)
+ uid = next(iter(new_key.userids))
+ sig = self.bart_key.certify(uid)
+ uid |= sig
+ message = wrapped_message.attach_keys(new_key)
+
+ items = _run_message(message, 1)
+ results_msg = items[0].msg
+
+ self.assertIn('List key updated with new signatures.',
+ results_msg.get_payload())
+
+ def test_sign_encrypted(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ self.mlist.subscribe(bart)
+ get_queue_messages('virgin')
+
+ self.pgp_list.key_signing_allowed = {MemberRole.member}
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key sign')
+ wrapped_message = MIMEWrapper(message)
+ new_key = copy.copy(self.pgp_list.pubkey)
+ uid = next(iter(new_key.userids))
+ sig = self.bart_key.certify(uid)
+ uid |= sig
+ message = wrapped_message.attach_keys(new_key)
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.encrypt(self.pgp_list.pubkey)
+
+ items = _run_message(message, 1)
+ results_msg = items[0].msg
+
+ self.assertIn('List key updated with new signatures.',
+ results_msg.get_payload())
+
+ def test_sign_extra_arg(self):
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key sign extra arguments', '')
+ items = _run_message(message, 1)
+ results_msg = items[0].msg
+
+ self.assertIn('Extraneous argument/s: extra,arguments',
+ results_msg.get_payload())
+
+ def test_sign_no_email(self):
+ message = _create_mixed('', 'test@example.com', 'key sign')
+
+ items = _run_message(message, 1)
+ results_msg = items[0].msg
+
+ self.assertIn('No email.', results_msg.get_payload())
+
+ def test_sign_no_pgp_address(self):
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key sign')
+
+ items = _run_message(message, 1)
+ results_msg = items[0].msg
+
+ self.assertIn('A pgp enabled address not found.',
+ results_msg.get_payload())
+
+ def test_sign_no_key_set(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ t.add(pgp_address)
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key sign')
+
+ items = _run_message(message, 1)
+ results_msg = items[0].msg
+
+ self.assertIn("You currently don't have a key set.",
+ results_msg.get_payload())
+
+ def test_sign_key_not_confirmed(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ t.add(pgp_address)
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key sign')
+
+ items = _run_message(message, 1)
+ results_msg = items[0].msg
+
+ self.assertIn('Your key is currently not confirmed.',
+ results_msg.get_payload())
+
+ def test_sign_no_key(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ message = _create_plain('bart@example.com', 'test@example.com',
+ 'key sign', '')
+
+ items = _run_message(message, 1)
+ results_msg = items[0].msg
+
+ self.assertIn('No keys attached? Send a key.',
+ results_msg.get_payload())
+
+ def test_sign_multiple_keys(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key sign')
+
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_keys(self.bart_key.pubkey)
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_keys(self.bart_new_key.pubkey)
+
+ items = _run_message(message, 1)
+ results_msg = items[0].msg
+
+ self.assertIn('More than one key! Send only one key.',
+ results_msg.get_payload())
+
+ def test_sign_not_allowed(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ self.pgp_list.key_signing_allowed = {MemberRole.owner}
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key sign')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_keys(self.pgp_list.pubkey)
+
+ items = _run_message(message, 1)
+ results_msg = items[0].msg
+
+ self.assertIn('You are not allowed to sign the list key.',
+ results_msg.get_payload())
+
+ def test_sign_wrong_keymaterial(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ self.mlist.subscribe(bart)
+ get_queue_messages('virgin')
+
+ self.pgp_list.key_signing_allowed = {MemberRole.member}
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key sign')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_keys(self.bart_key.pubkey)
+
+ items = _run_message(message, 1)
+ results_msg = items[0].msg
+
+ self.assertIn('You sent a wrong key.',
+ results_msg.get_payload())
+
+ def test_sign_no_uids(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ self.mlist.subscribe(bart)
+ get_queue_messages('virgin')
+
+ self.pgp_list.key_signing_allowed = {MemberRole.member}
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key sign')
+ wrapped_message = MIMEWrapper(message)
+ new_key = copy.copy(self.pgp_list.pubkey)
+ for uid in new_key.userids:
+ new_key.del_uid(uid.email)
+ message = wrapped_message.attach_keys(new_key)
+
+ items = _run_message(message, 1)
+ results_msg = items[0].msg
+
+ self.assertIn('No signed UIDs found.',
+ results_msg.get_payload())
+
+ def test_sign_no_new_sig(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ self.mlist.subscribe(bart)
+ get_queue_messages('virgin')
+
+ self.pgp_list.key_signing_allowed = {MemberRole.member}
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key sign')
+ wrapped_message = MIMEWrapper(message)
+ message = wrapped_message.attach_keys(self.pgp_list.pubkey)
+
+ items = _run_message(message, 1)
+ results_msg = items[0].msg
+
+ self.assertIn('No new certifications found.',
+ results_msg.get_payload())
+
+ def test_sign_no_sig_by_key(self):
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key = self.bart_key.pubkey
+ pgp_address.key_confirmed = True
+ t.add(pgp_address)
+
+ self.mlist.subscribe(bart)
+ get_queue_messages('virgin')
+
+ self.pgp_list.key_signing_allowed = {MemberRole.member}
+
+ message = _create_mixed('bart@example.com', 'test@example.com',
+ 'key sign')
+ wrapped_message = MIMEWrapper(message)
+ new_key = copy.copy(self.pgp_list.pubkey)
+ uid = next(iter(new_key.userids))
+ sig = self.bart_new_key.certify(uid)
+ uid |= sig
+ message = wrapped_message.attach_keys(new_key)
+
+ items = _run_message(message, 1)
+ results_msg = items[0].msg
+
+ self.assertIn('No new certifications found.',
+ results_msg.get_payload())
+
class TestGeneral(unittest.TestCase):
layer = PGPConfigLayer