diff options
| author | J08nY | 2017-07-15 17:21:35 +0200 |
|---|---|---|
| committer | J08nY | 2017-07-15 17:21:35 +0200 |
| commit | bc396ca1623c885cd6df4ab49bcccf23880a29c8 (patch) | |
| tree | 29ed0f5e0d78cdcec3494d1ea4b637fde8a97099 /src/mailman_pgp/workflows/pubkey.py | |
| parent | 64572d90cac74f8967dfe9ef25206812d9986d63 (diff) | |
| download | mailman-pgp-bc396ca1623c885cd6df4ab49bcccf23880a29c8.tar.gz mailman-pgp-bc396ca1623c885cd6df4ab49bcccf23880a29c8.tar.zst mailman-pgp-bc396ca1623c885cd6df4ab49bcccf23880a29c8.zip | |
Diffstat (limited to 'src/mailman_pgp/workflows/pubkey.py')
| -rw-r--r-- | src/mailman_pgp/workflows/pubkey.py | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/src/mailman_pgp/workflows/pubkey.py b/src/mailman_pgp/workflows/pubkey.py new file mode 100644 index 0000000..e9ce348 --- /dev/null +++ b/src/mailman_pgp/workflows/pubkey.py @@ -0,0 +1,118 @@ +from mailman.email.message import UserNotification +from mailman.interfaces.subscriptions import TokenOwner +from pgpy import PGPKey + +from mailman_pgp.database import transaction +from mailman_pgp.model.address import PGPAddress +from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.utils import copy_headers +from mailman_pgp.pgp.wrapper import PGPWrapper + +KEY_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +We need your pubkey. +Reply to this message with it as a PGP/MIME(preferred) or inline. +----------""" + +CONFIRM_REQUEST = """\ +---------- +TODO: this is a pgp enabled list. +Reply to this message with this whole text +signed with your supplied key, either inline or PGP/MIME. + +Fingerprint: {} +Token: {} +---------- +""" + + +class SetPubkeyMixin: + def __init__(self, pubkey=None): + self.pubkey = pubkey + + @property + def pubkey_key(self): + if self.pubkey is None: + return None + return str(self.pubkey) + + @pubkey_key.setter + def pubkey_key(self, value): + if value is not None: + self.pubkey, _ = PGPKey.from_blob(value) + else: + self.pubkey = None + + def _step_pubkey_checks(self): + pgp_address = PGPAddress.for_address(self.address) + assert pgp_address is not None + + if self.pubkey is None: + if pgp_address.key is None: + self.push('send_key_request') + else: + with transaction(): + pgp_address.key = self.pubkey + + def _step_send_key_request(self): + self._set_token(TokenOwner.subscriber) + self.push('receive_key') + self.save() + request_address = self.mlist.request_address + email_address = self.address.email + msg = UserNotification(email_address, request_address, + 'key set {}'.format(self.token), + KEY_REQUEST) + msg.send(self.mlist, add_precedence=False) + # Now we wait for the confirmation. + raise StopIteration + + def _step_receive_key(self): + self._restore_subscriber() + self._set_token(TokenOwner.no_one) + + +class ConfirmPubkeyMixin: + def __init__(self, pre_confirmed=False): + self.pubkey_confirmed = pre_confirmed + + def _step_pubkey_confirmation(self): + pgp_address = PGPAddress.for_address(self.address) + assert pgp_address is not None + + if self.pubkey_confirmed: + with transaction(): + pgp_address.key_confirmed = True + else: + if not pgp_address.key_confirmed: + self.push('send_key_confirm_request') + + def _step_send_key_confirm_request(self): + self._set_token(TokenOwner.subscriber) + self.push('receive_key_confirmation') + self.save() + + pgp_address = PGPAddress.for_address(self.address) + request_address = self.mlist.request_address + email_address = self.address.email + msg = UserNotification(email_address, request_address, + 'key confirm {}'.format(self.token), + CONFIRM_REQUEST.format( + pgp_address.key_fingerprint, + self.token)) + pgp_list = PGPMailingList.for_list(self.mlist) + wrapped = PGPWrapper(msg) + encrypted = wrapped.sign_encrypt(pgp_list.key, pgp_address.key) + + msg.set_payload(encrypted.get_payload()) + copy_headers(encrypted, msg, True) + msg.send(self.mlist) + raise StopIteration + + def _step_receive_key_confirmation(self): + self._restore_subscriber() + self._set_token(TokenOwner.no_one) + with transaction(): + pgp_address = PGPAddress.for_address(self.address) + pgp_address.key_confirmed = True |
