aboutsummaryrefslogtreecommitdiff
path: root/src/mailman_pgp/commands/eml_key.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman_pgp/commands/eml_key.py')
-rw-r--r--src/mailman_pgp/commands/eml_key.py128
1 files changed, 113 insertions, 15 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index 0775bed..5799b8f 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -17,6 +17,7 @@
"""The key email command."""
import logging
+import copy
from email.mime.text import MIMEText
from mailman.email.message import UserNotification
@@ -25,19 +26,24 @@ from mailman.interfaces.pending import IPendings
from mailman.interfaces.subscriptions import ISubscriptionManager
from mailman.interfaces.usermanager import IUserManager
from pgpy.constants import KeyFlags
+from pgpy.errors import PGPError
from public import public
from zope.component import getUtility
from zope.interface import implementer
+from mailman_pgp.config import mm_config
from mailman_pgp.database import transaction
from mailman_pgp.model.address import PGPAddress
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.pgp.mime import MIMEWrapper
from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.utils.email import get_email
+from mailman_pgp.utils.pgp import key_usable
from mailman_pgp.workflows.key_change import (CHANGE_CONFIRM_REQUEST,
+ KeyChangeModWorkflow,
KeyChangeWorkflow)
-from mailman_pgp.workflows.pubkey import CONFIRM_REQUEST
+from mailman_pgp.workflows.key_confirm import CONFIRM_REQUEST
+from mailman_pgp.workflows.key_revoke import KeyRevokeWorkflow
log = logging.getLogger('mailman.plugin.pgp.commands')
@@ -78,10 +84,7 @@ def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results):
file=results)
return ContinueProcessing.no
- usage_flags = key.usage_flags()
- for subkey in key.subkeys.values():
- usage_flags |= subkey.usage_flags()
- if KeyFlags.EncryptCommunications not in usage_flags:
+ if not key_usable(key, {KeyFlags.EncryptCommunications, KeyFlags.Sign}):
print('Need a key which can be used to encrypt communications.',
file=results)
return ContinueProcessing.no
@@ -170,7 +173,8 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
print('Wrong token.', file=results)
return ContinueProcessing.no
- if pendable.get('type') == KeyChangeWorkflow.pendable_class().PEND_TYPE:
+ if pendable.get('type') in (KeyChangeWorkflow.name,
+ KeyChangeModWorkflow.name):
expecting = CHANGE_CONFIRM_REQUEST.format(pendable.get('fingerprint'),
token)
else:
@@ -178,6 +182,8 @@ def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
for sig_subject in wrapped.get_signed():
if expecting in sig_subject:
+ with transaction():
+ pgp_address.key_confirmed = True
ISubscriptionManager(mlist).confirm(token)
break
else:
@@ -211,6 +217,14 @@ def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results):
print('A pgp enabled address not found.', file=results)
return ContinueProcessing.no
+ if pgp_address.key is None:
+ print("You currently don't have a key set.", file=results)
+ return ContinueProcessing.no
+
+ if not pgp_address.key_confirmed:
+ print('Your key is currently not confirmed.', file=results)
+ return ContinueProcessing.no
+
wrapped = PGPWrapper(msg)
if wrapped.is_encrypted():
decrypted = wrapped.try_decrypt(pgp_list.key)
@@ -231,24 +245,108 @@ def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results):
file=results)
return ContinueProcessing.no
- usage_flags = key.usage_flags()
- for subkey in key.subkeys.values():
- usage_flags |= subkey.usage_flags()
- if KeyFlags.EncryptCommunications not in usage_flags:
+ if not key_usable(key, {KeyFlags.EncryptCommunications, KeyFlags.Sign}):
print('Need a key which can be used to encrypt communications.',
file=results)
return ContinueProcessing.no
- workflow = KeyChangeWorkflow(mlist, pgp_address, key)
+ workflow_class = mm_config.workflows[pgp_list.key_change_workflow]
+
+ workflow = workflow_class(mlist, pgp_address, key)
list(workflow)
print('Key change request received.', file=results)
return ContinueProcessing.no
def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results):
- # Current key revocation certificate in attachment, restarts the
- # subscription process, or rather only it's key setup part.
- pass
+ """
+ `key revoke` command.
+
+ Used when a user has to revoke a part of a key used for a `PGPAddress`.
+
+ * This command message CAN be encrypted to the list key, in which case it
+ will be decrypted.
+ * This command message MUST have at least one revocation certificate
+ attached.
+ """
+ if len(arguments) != 1:
+ print('Extraneous argument/s: ' + ','.join(arguments[1:]),
+ file=results)
+ return ContinueProcessing.no
+
+ email = get_email(msg)
+ if not email:
+ print('No email to revoke key of.', file=results)
+ return ContinueProcessing.no
+
+ pgp_address = PGPAddress.for_email(email)
+ if pgp_address is None:
+ print('A pgp enabled address not found.', file=results)
+ return ContinueProcessing.no
+
+ key = pgp_address.key
+ if key is None:
+ print("You currently don't have a key set.", file=results)
+ return ContinueProcessing.no
+
+ if not pgp_address.key_confirmed:
+ print('Your key is currently not confirmed.', file=results)
+ return ContinueProcessing.no
+
+ wrapped = PGPWrapper(msg)
+ if wrapped.is_encrypted():
+ decrypted = wrapped.try_decrypt(pgp_list.key)
+ wrapped = PGPWrapper(decrypted)
+
+ if not wrapped.has_revocs():
+ print('No key revocations attached? Send a key revocation.',
+ file=results)
+ return ContinueProcessing.no
+
+ key_copy = copy.copy(key)
+
+ revocs = list(wrapped.revocs())
+ matches = 0
+ for revoc in revocs:
+ old_matches = matches
+ try:
+ verified = key_copy.verify(key_copy, revoc)
+ if verified:
+ key_copy |= revoc
+ matches += 1
+ continue
+ except PGPError:
+ pass
+
+ for subkey in key_copy.subkeys.values():
+ try:
+ verified = key_copy.verify(subkey, revoc)
+ if verified:
+ subkey |= revoc
+ matches += 1
+ break
+ except PGPError:
+ pass
+ # No match?
+ if matches == old_matches:
+ print('Revocation found for not-found key.', file=results)
+
+ if not key_usable(key_copy,
+ {KeyFlags.EncryptCommunications, KeyFlags.Sign}):
+ # Start reset process.
+ with transaction():
+ pgp_address.key = None
+ pgp_address.key_confirmed = False
+ workflow = KeyRevokeWorkflow(mlist, pgp_address)
+ list(workflow)
+ print('Key needs to be reset.', file=results)
+ else:
+ # Just update key.
+ if matches > 0:
+ with transaction():
+ pgp_address.key = key_copy
+ print('Key succesfully updated.', file=results)
+ return ContinueProcessing.yes
def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results):
@@ -273,7 +371,7 @@ def _cmd_receive(pgp_list, mlist, msg, msgdata, arguments, results):
msg['MIME-Version'] = '1.0'
msg.attach(MIMEText('Here is the public key you requested.'))
wrapped = MIMEWrapper(msg)
- msg = wrapped.attach_key(pgp_list.pubkey)
+ msg = wrapped.attach_keys(pgp_list.pubkey)
msg.send(mlist)
return ContinueProcessing.yes