aboutsummaryrefslogtreecommitdiff
path: root/src/mailman_pgp/commands/eml_key.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman_pgp/commands/eml_key.py')
-rw-r--r--src/mailman_pgp/commands/eml_key.py216
1 files changed, 132 insertions, 84 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index b280603..e9b5e19 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -32,106 +32,154 @@ from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.workflows.base import CONFIRM_REQUEST
-@public
-@implementer(IEmailCommand)
-class KeyCommand:
- """The `key` command."""
+def _get_email(msg):
+ display_name, email = parseaddr(msg['from'])
+ # Address could be None or the empty string.
+ if not email:
+ email = msg.sender
+ return email
- name = 'key'
- argument_description = '<set|confirm|change|revoke|sign>'
- short_description = ''
- description = ''
- def process(self, mlist, msg, msgdata, arguments, results):
- """See `IEmailCommand`."""
- if len(arguments) == 0:
- print('No sub-command specified,'
- ' must be one of <change|revoke|sign>.', file=results)
- return ContinueProcessing.no
+def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results):
+ if len(arguments) != 2:
+ print('Missing token.', file=results)
+ return ContinueProcessing.no
- pgp_list = PGPMailingList.for_list(mlist)
- if pgp_list is None:
- print('This mailing list doesnt have pgp enabled.', file=results)
- return ContinueProcessing.yes # XXX: What does this even mean?
+ wrapped = PGPWrapper(msg)
+ if not wrapped.has_keys():
+ print('No keys attached? Send a key.', file=results)
+ return ContinueProcessing.no
+
+ keys = list(wrapped.keys())
+ if len(keys) != 1:
+ print('More than one key! Send only one key.', file=results)
+ return ContinueProcessing.no
+
+ email = _get_email(msg)
+ if not email:
+ print('No email to subscribe with.', file=results)
+ return ContinueProcessing.no
+
+ address = getUtility(IUserManager).get_address(email)
+ if not address:
+ print('No adddress to subscribe with.', file=results)
+ return ContinueProcessing.no
- if arguments[0] == 'set':
- if len(arguments) != 2:
- print('Missing token', file=results)
- return ContinueProcessing.no
+ with transaction() as t:
+ pgp_address = PGPAddress(address)
+ pgp_address.key = keys.pop()
+ t.add(pgp_address)
- wrapped = PGPWrapper(msg)
- if not wrapped.has_keys():
- print('No keys here?')
- return ContinueProcessing.no
+ token = arguments[1]
+ try:
+ ISubscriptionManager(mlist).confirm(token)
+ print('Key succesfully set.', file=results)
+ print('Key fingerprint: {}'.format(pgp_address.key.fingerprint), file=results)
+ except LookupError:
+ print('Wrong token.', file=results)
- keys = list(wrapped.keys())
- if len(keys) != 1:
- print('More than one key! Which one is yours?')
- return ContinueProcessing.no
+ return ContinueProcessing.no
- with transaction() as t:
- pgp_address = PGPAddress(self.address)
- pgp_address.key = keys.pop()
- t.add(pgp_address)
- token = arguments[1]
- ISubscriptionManager(mlist).confirm(token)
- # XXX finish this
- elif arguments[0] == 'confirm':
- if len(arguments) != 2:
- print('Missing token', file=results)
- return ContinueProcessing.no
+def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
+ if len(arguments) != 2:
+ print('Missing token', file=results)
+ return ContinueProcessing.no
- display_name, email = parseaddr(msg['from'])
- # Address could be None or the empty string.
- if not email:
- email = msg.sender
- if not email:
- print('No email to subscribe with.', file=results)
- return ContinueProcessing.no
- um = getUtility(IUserManager)
+ email = _get_email(msg)
+ if not email:
+ print('No email to subscribe with.', file=results)
+ return ContinueProcessing.no
- pgp_address = PGPAddress.for_address(um.get_address(email))
- if pgp_address is None:
- # TODO
- pass
+ pgp_address = PGPAddress.for_email(email)
+ if pgp_address is None:
+ print('A pgp enabled address not found.', file=results)
+ return ContinueProcessing.no
- wrapped = PGPWrapper(msg)
- if wrapped.is_encrypted():
- decrypted = wrapped.decrypt(pgp_list.key)
- wrapped = PGPWrapper(decrypted)
+ wrapped = PGPWrapper(msg)
+ if wrapped.is_encrypted():
+ decrypted = wrapped.decrypt(pgp_list.key)
+ wrapped = PGPWrapper(decrypted)
- if not wrapped.is_signed():
- print('Message not signed, ignoring.', file=results)
- return ContinueProcessing.no
+ if not wrapped.is_signed():
+ print('Message not signed, ignoring.', file=results)
+ return ContinueProcessing.no
- if not wrapped.verifies(pgp_address.key):
- print('Message failed to verify.', file=results)
- return ContinueProcessing.no
+ if not wrapped.verifies(pgp_address.key):
+ print('Message failed to verify.', file=results)
+ return ContinueProcessing.no
- token = arguments[1]
+ token = arguments[1]
- expecting = CONFIRM_REQUEST.format(pgp_address.key.pubkey,
- token)
- if wrapped.get_signed() == expecting:
+ expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint,
+ token)
+ for sig_subject in wrapped.get_signed():
+ if expecting in sig_subject:
+ with transaction():
+ pgp_address.key_confirmed = True
ISubscriptionManager(mlist).confirm(token)
- else:
- # TODO
- pass
+ break
+ else:
+ print("Message doesn't contain the expected statement.", file=results)
+ return ContinueProcessing.no
+
+
+def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results):
+ # New public key in attachment, requires to be signed with current
+ # key
+ pass
- elif arguments[0] == 'change':
- # New public key in attachment, requires to be signed with current
- # key
- pass
- elif arguments[0] == 'revoke':
- # Current key revocation certificate in attachment, restarts the
- # subscription process, or rather only it's key setup part.
- pass
- elif arguments[0] == 'sign':
- # List public key attached, signed by the users current key.
- pass
- else:
+
+def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results):
+ # Current key revocation certificate in attachment, restarts the
+ # subscription process, or rather only it's key setup part.
+ pass
+
+
+def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results):
+ # List public key attached, signed by the users current key.
+ pass
+
+
+SUBCOMMANDS = {
+ 'set': _cmd_set,
+ 'confirm': _cmd_confirm,
+ 'change': _cmd_change,
+ 'revoke': _cmd_revoke,
+ 'sign': _cmd_sign
+}
+
+ARGUMENTS = '<' + '|'.join(SUBCOMMANDS.keys()) + '>'
+
+
+@public
+@implementer(IEmailCommand)
+class KeyCommand:
+ """The `key` command."""
+
+ name = 'key'
+ argument_description = ARGUMENTS
+ short_description = ''
+ description = """\
+
+ """
+
+ def process(self, mlist, msg, msgdata, arguments, results):
+ """See `IEmailCommand`."""
+ if len(arguments) == 0:
+ print('No sub-command specified,'
+ ' must be one of {}.'.format(ARGUMENTS), file=results)
+ return ContinueProcessing.no
+
+ if arguments[0] not in SUBCOMMANDS:
print('Wrong sub-command specified,'
- ' must be one of <change|revoke|sign>.', file=results)
+ ' must be one of {}.'.format(ARGUMENTS), file=results)
+ return ContinueProcessing.no
- return ContinueProcessing.no
+ pgp_list = PGPMailingList.for_list(mlist)
+ if pgp_list is None:
+ print("This mailing list doesn't have pgp enabled.", file=results)
+ return ContinueProcessing.no
+
+ command = SUBCOMMANDS[arguments[0]]
+ return command(pgp_list, mlist, msg, msgdata, arguments, results)