diff options
| author | J08nY | 2017-07-11 20:57:29 +0200 |
|---|---|---|
| committer | J08nY | 2017-07-11 20:57:29 +0200 |
| commit | 77b63eaae46697b24fe4cc604a36b869619b4638 (patch) | |
| tree | b91c61103b588f822313abb0716bae76f168ca44 /src/mailman_pgp/commands/eml_key.py | |
| parent | 97abc15b2e92fcf9109997043e4297f16d0bf5c7 (diff) | |
| download | mailman-pgp-77b63eaae46697b24fe4cc604a36b869619b4638.tar.gz mailman-pgp-77b63eaae46697b24fe4cc604a36b869619b4638.tar.zst mailman-pgp-77b63eaae46697b24fe4cc604a36b869619b4638.zip | |
Diffstat (limited to 'src/mailman_pgp/commands/eml_key.py')
| -rw-r--r-- | src/mailman_pgp/commands/eml_key.py | 216 |
1 files changed, 132 insertions, 84 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py index b280603..e9b5e19 100644 --- a/src/mailman_pgp/commands/eml_key.py +++ b/src/mailman_pgp/commands/eml_key.py @@ -32,106 +32,154 @@ from mailman_pgp.pgp.wrapper import PGPWrapper from mailman_pgp.workflows.base import CONFIRM_REQUEST -@public -@implementer(IEmailCommand) -class KeyCommand: - """The `key` command.""" +def _get_email(msg): + display_name, email = parseaddr(msg['from']) + # Address could be None or the empty string. + if not email: + email = msg.sender + return email - name = 'key' - argument_description = '<set|confirm|change|revoke|sign>' - short_description = '' - description = '' - def process(self, mlist, msg, msgdata, arguments, results): - """See `IEmailCommand`.""" - if len(arguments) == 0: - print('No sub-command specified,' - ' must be one of <change|revoke|sign>.', file=results) - return ContinueProcessing.no +def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results): + if len(arguments) != 2: + print('Missing token.', file=results) + return ContinueProcessing.no - pgp_list = PGPMailingList.for_list(mlist) - if pgp_list is None: - print('This mailing list doesnt have pgp enabled.', file=results) - return ContinueProcessing.yes # XXX: What does this even mean? + wrapped = PGPWrapper(msg) + if not wrapped.has_keys(): + print('No keys attached? Send a key.', file=results) + return ContinueProcessing.no + + keys = list(wrapped.keys()) + if len(keys) != 1: + print('More than one key! Send only one key.', file=results) + return ContinueProcessing.no + + email = _get_email(msg) + if not email: + print('No email to subscribe with.', file=results) + return ContinueProcessing.no + + address = getUtility(IUserManager).get_address(email) + if not address: + print('No adddress to subscribe with.', file=results) + return ContinueProcessing.no - if arguments[0] == 'set': - if len(arguments) != 2: - print('Missing token', file=results) - return ContinueProcessing.no + with transaction() as t: + pgp_address = PGPAddress(address) + pgp_address.key = keys.pop() + t.add(pgp_address) - wrapped = PGPWrapper(msg) - if not wrapped.has_keys(): - print('No keys here?') - return ContinueProcessing.no + token = arguments[1] + try: + ISubscriptionManager(mlist).confirm(token) + print('Key succesfully set.', file=results) + print('Key fingerprint: {}'.format(pgp_address.key.fingerprint), file=results) + except LookupError: + print('Wrong token.', file=results) - keys = list(wrapped.keys()) - if len(keys) != 1: - print('More than one key! Which one is yours?') - return ContinueProcessing.no + return ContinueProcessing.no - with transaction() as t: - pgp_address = PGPAddress(self.address) - pgp_address.key = keys.pop() - t.add(pgp_address) - token = arguments[1] - ISubscriptionManager(mlist).confirm(token) - # XXX finish this - elif arguments[0] == 'confirm': - if len(arguments) != 2: - print('Missing token', file=results) - return ContinueProcessing.no +def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results): + if len(arguments) != 2: + print('Missing token', file=results) + return ContinueProcessing.no - display_name, email = parseaddr(msg['from']) - # Address could be None or the empty string. - if not email: - email = msg.sender - if not email: - print('No email to subscribe with.', file=results) - return ContinueProcessing.no - um = getUtility(IUserManager) + email = _get_email(msg) + if not email: + print('No email to subscribe with.', file=results) + return ContinueProcessing.no - pgp_address = PGPAddress.for_address(um.get_address(email)) - if pgp_address is None: - # TODO - pass + pgp_address = PGPAddress.for_email(email) + if pgp_address is None: + print('A pgp enabled address not found.', file=results) + return ContinueProcessing.no - wrapped = PGPWrapper(msg) - if wrapped.is_encrypted(): - decrypted = wrapped.decrypt(pgp_list.key) - wrapped = PGPWrapper(decrypted) + wrapped = PGPWrapper(msg) + if wrapped.is_encrypted(): + decrypted = wrapped.decrypt(pgp_list.key) + wrapped = PGPWrapper(decrypted) - if not wrapped.is_signed(): - print('Message not signed, ignoring.', file=results) - return ContinueProcessing.no + if not wrapped.is_signed(): + print('Message not signed, ignoring.', file=results) + return ContinueProcessing.no - if not wrapped.verifies(pgp_address.key): - print('Message failed to verify.', file=results) - return ContinueProcessing.no + if not wrapped.verifies(pgp_address.key): + print('Message failed to verify.', file=results) + return ContinueProcessing.no - token = arguments[1] + token = arguments[1] - expecting = CONFIRM_REQUEST.format(pgp_address.key.pubkey, - token) - if wrapped.get_signed() == expecting: + expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint, + token) + for sig_subject in wrapped.get_signed(): + if expecting in sig_subject: + with transaction(): + pgp_address.key_confirmed = True ISubscriptionManager(mlist).confirm(token) - else: - # TODO - pass + break + else: + print("Message doesn't contain the expected statement.", file=results) + return ContinueProcessing.no + + +def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results): + # New public key in attachment, requires to be signed with current + # key + pass - elif arguments[0] == 'change': - # New public key in attachment, requires to be signed with current - # key - pass - elif arguments[0] == 'revoke': - # Current key revocation certificate in attachment, restarts the - # subscription process, or rather only it's key setup part. - pass - elif arguments[0] == 'sign': - # List public key attached, signed by the users current key. - pass - else: + +def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results): + # Current key revocation certificate in attachment, restarts the + # subscription process, or rather only it's key setup part. + pass + + +def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results): + # List public key attached, signed by the users current key. + pass + + +SUBCOMMANDS = { + 'set': _cmd_set, + 'confirm': _cmd_confirm, + 'change': _cmd_change, + 'revoke': _cmd_revoke, + 'sign': _cmd_sign +} + +ARGUMENTS = '<' + '|'.join(SUBCOMMANDS.keys()) + '>' + + +@public +@implementer(IEmailCommand) +class KeyCommand: + """The `key` command.""" + + name = 'key' + argument_description = ARGUMENTS + short_description = '' + description = """\ + + """ + + def process(self, mlist, msg, msgdata, arguments, results): + """See `IEmailCommand`.""" + if len(arguments) == 0: + print('No sub-command specified,' + ' must be one of {}.'.format(ARGUMENTS), file=results) + return ContinueProcessing.no + + if arguments[0] not in SUBCOMMANDS: print('Wrong sub-command specified,' - ' must be one of <change|revoke|sign>.', file=results) + ' must be one of {}.'.format(ARGUMENTS), file=results) + return ContinueProcessing.no - return ContinueProcessing.no + pgp_list = PGPMailingList.for_list(mlist) + if pgp_list is None: + print("This mailing list doesn't have pgp enabled.", file=results) + return ContinueProcessing.no + + command = SUBCOMMANDS[arguments[0]] + return command(pgp_list, mlist, msg, msgdata, arguments, results) |
