aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJ08nY2017-07-11 20:57:29 +0200
committerJ08nY2017-07-11 20:57:29 +0200
commit77b63eaae46697b24fe4cc604a36b869619b4638 (patch)
treeb91c61103b588f822313abb0716bae76f168ca44 /src
parent97abc15b2e92fcf9109997043e4297f16d0bf5c7 (diff)
downloadmailman-pgp-77b63eaae46697b24fe4cc604a36b869619b4638.tar.gz
mailman-pgp-77b63eaae46697b24fe4cc604a36b869619b4638.tar.zst
mailman-pgp-77b63eaae46697b24fe4cc604a36b869619b4638.zip
Diffstat (limited to 'src')
-rw-r--r--src/mailman_pgp/commands/eml_key.py216
-rw-r--r--src/mailman_pgp/commands/tests/__init__.py0
-rw-r--r--src/mailman_pgp/commands/tests/test_key.py123
-rw-r--r--src/mailman_pgp/model/address.py10
-rw-r--r--src/mailman_pgp/pgp/mime.py22
5 files changed, 285 insertions, 86 deletions
diff --git a/src/mailman_pgp/commands/eml_key.py b/src/mailman_pgp/commands/eml_key.py
index b280603..e9b5e19 100644
--- a/src/mailman_pgp/commands/eml_key.py
+++ b/src/mailman_pgp/commands/eml_key.py
@@ -32,106 +32,154 @@ from mailman_pgp.pgp.wrapper import PGPWrapper
from mailman_pgp.workflows.base import CONFIRM_REQUEST
-@public
-@implementer(IEmailCommand)
-class KeyCommand:
- """The `key` command."""
+def _get_email(msg):
+ display_name, email = parseaddr(msg['from'])
+ # Address could be None or the empty string.
+ if not email:
+ email = msg.sender
+ return email
- name = 'key'
- argument_description = '<set|confirm|change|revoke|sign>'
- short_description = ''
- description = ''
- def process(self, mlist, msg, msgdata, arguments, results):
- """See `IEmailCommand`."""
- if len(arguments) == 0:
- print('No sub-command specified,'
- ' must be one of <change|revoke|sign>.', file=results)
- return ContinueProcessing.no
+def _cmd_set(pgp_list, mlist, msg, msgdata, arguments, results):
+ if len(arguments) != 2:
+ print('Missing token.', file=results)
+ return ContinueProcessing.no
- pgp_list = PGPMailingList.for_list(mlist)
- if pgp_list is None:
- print('This mailing list doesnt have pgp enabled.', file=results)
- return ContinueProcessing.yes # XXX: What does this even mean?
+ wrapped = PGPWrapper(msg)
+ if not wrapped.has_keys():
+ print('No keys attached? Send a key.', file=results)
+ return ContinueProcessing.no
+
+ keys = list(wrapped.keys())
+ if len(keys) != 1:
+ print('More than one key! Send only one key.', file=results)
+ return ContinueProcessing.no
+
+ email = _get_email(msg)
+ if not email:
+ print('No email to subscribe with.', file=results)
+ return ContinueProcessing.no
+
+ address = getUtility(IUserManager).get_address(email)
+ if not address:
+ print('No adddress to subscribe with.', file=results)
+ return ContinueProcessing.no
- if arguments[0] == 'set':
- if len(arguments) != 2:
- print('Missing token', file=results)
- return ContinueProcessing.no
+ with transaction() as t:
+ pgp_address = PGPAddress(address)
+ pgp_address.key = keys.pop()
+ t.add(pgp_address)
- wrapped = PGPWrapper(msg)
- if not wrapped.has_keys():
- print('No keys here?')
- return ContinueProcessing.no
+ token = arguments[1]
+ try:
+ ISubscriptionManager(mlist).confirm(token)
+ print('Key succesfully set.', file=results)
+ print('Key fingerprint: {}'.format(pgp_address.key.fingerprint), file=results)
+ except LookupError:
+ print('Wrong token.', file=results)
- keys = list(wrapped.keys())
- if len(keys) != 1:
- print('More than one key! Which one is yours?')
- return ContinueProcessing.no
+ return ContinueProcessing.no
- with transaction() as t:
- pgp_address = PGPAddress(self.address)
- pgp_address.key = keys.pop()
- t.add(pgp_address)
- token = arguments[1]
- ISubscriptionManager(mlist).confirm(token)
- # XXX finish this
- elif arguments[0] == 'confirm':
- if len(arguments) != 2:
- print('Missing token', file=results)
- return ContinueProcessing.no
+def _cmd_confirm(pgp_list, mlist, msg, msgdata, arguments, results):
+ if len(arguments) != 2:
+ print('Missing token', file=results)
+ return ContinueProcessing.no
- display_name, email = parseaddr(msg['from'])
- # Address could be None or the empty string.
- if not email:
- email = msg.sender
- if not email:
- print('No email to subscribe with.', file=results)
- return ContinueProcessing.no
- um = getUtility(IUserManager)
+ email = _get_email(msg)
+ if not email:
+ print('No email to subscribe with.', file=results)
+ return ContinueProcessing.no
- pgp_address = PGPAddress.for_address(um.get_address(email))
- if pgp_address is None:
- # TODO
- pass
+ pgp_address = PGPAddress.for_email(email)
+ if pgp_address is None:
+ print('A pgp enabled address not found.', file=results)
+ return ContinueProcessing.no
- wrapped = PGPWrapper(msg)
- if wrapped.is_encrypted():
- decrypted = wrapped.decrypt(pgp_list.key)
- wrapped = PGPWrapper(decrypted)
+ wrapped = PGPWrapper(msg)
+ if wrapped.is_encrypted():
+ decrypted = wrapped.decrypt(pgp_list.key)
+ wrapped = PGPWrapper(decrypted)
- if not wrapped.is_signed():
- print('Message not signed, ignoring.', file=results)
- return ContinueProcessing.no
+ if not wrapped.is_signed():
+ print('Message not signed, ignoring.', file=results)
+ return ContinueProcessing.no
- if not wrapped.verifies(pgp_address.key):
- print('Message failed to verify.', file=results)
- return ContinueProcessing.no
+ if not wrapped.verifies(pgp_address.key):
+ print('Message failed to verify.', file=results)
+ return ContinueProcessing.no
- token = arguments[1]
+ token = arguments[1]
- expecting = CONFIRM_REQUEST.format(pgp_address.key.pubkey,
- token)
- if wrapped.get_signed() == expecting:
+ expecting = CONFIRM_REQUEST.format(pgp_address.key_fingerprint,
+ token)
+ for sig_subject in wrapped.get_signed():
+ if expecting in sig_subject:
+ with transaction():
+ pgp_address.key_confirmed = True
ISubscriptionManager(mlist).confirm(token)
- else:
- # TODO
- pass
+ break
+ else:
+ print("Message doesn't contain the expected statement.", file=results)
+ return ContinueProcessing.no
+
+
+def _cmd_change(pgp_list, mlist, msg, msgdata, arguments, results):
+ # New public key in attachment, requires to be signed with current
+ # key
+ pass
- elif arguments[0] == 'change':
- # New public key in attachment, requires to be signed with current
- # key
- pass
- elif arguments[0] == 'revoke':
- # Current key revocation certificate in attachment, restarts the
- # subscription process, or rather only it's key setup part.
- pass
- elif arguments[0] == 'sign':
- # List public key attached, signed by the users current key.
- pass
- else:
+
+def _cmd_revoke(pgp_list, mlist, msg, msgdata, arguments, results):
+ # Current key revocation certificate in attachment, restarts the
+ # subscription process, or rather only it's key setup part.
+ pass
+
+
+def _cmd_sign(pgp_list, mlist, msg, msgdata, arguments, results):
+ # List public key attached, signed by the users current key.
+ pass
+
+
+SUBCOMMANDS = {
+ 'set': _cmd_set,
+ 'confirm': _cmd_confirm,
+ 'change': _cmd_change,
+ 'revoke': _cmd_revoke,
+ 'sign': _cmd_sign
+}
+
+ARGUMENTS = '<' + '|'.join(SUBCOMMANDS.keys()) + '>'
+
+
+@public
+@implementer(IEmailCommand)
+class KeyCommand:
+ """The `key` command."""
+
+ name = 'key'
+ argument_description = ARGUMENTS
+ short_description = ''
+ description = """\
+
+ """
+
+ def process(self, mlist, msg, msgdata, arguments, results):
+ """See `IEmailCommand`."""
+ if len(arguments) == 0:
+ print('No sub-command specified,'
+ ' must be one of {}.'.format(ARGUMENTS), file=results)
+ return ContinueProcessing.no
+
+ if arguments[0] not in SUBCOMMANDS:
print('Wrong sub-command specified,'
- ' must be one of <change|revoke|sign>.', file=results)
+ ' must be one of {}.'.format(ARGUMENTS), file=results)
+ return ContinueProcessing.no
- return ContinueProcessing.no
+ pgp_list = PGPMailingList.for_list(mlist)
+ if pgp_list is None:
+ print("This mailing list doesn't have pgp enabled.", file=results)
+ return ContinueProcessing.no
+
+ command = SUBCOMMANDS[arguments[0]]
+ return command(pgp_list, mlist, msg, msgdata, arguments, results)
diff --git a/src/mailman_pgp/commands/tests/__init__.py b/src/mailman_pgp/commands/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/mailman_pgp/commands/tests/__init__.py
diff --git a/src/mailman_pgp/commands/tests/test_key.py b/src/mailman_pgp/commands/tests/test_key.py
new file mode 100644
index 0000000..8b44011
--- /dev/null
+++ b/src/mailman_pgp/commands/tests/test_key.py
@@ -0,0 +1,123 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+
+import unittest
+
+from mailman.app.lifecycle import create_list
+from mailman.email.message import Message
+from mailman.interfaces.subscriptions import ISubscriptionManager
+from mailman.interfaces.usermanager import IUserManager
+from mailman.runners.command import CommandRunner
+from mailman.testing.helpers import make_testable_runner, get_queue_messages
+from mailman.utilities.datetime import now
+from zope.component import getUtility
+
+from mailman_pgp.config import mm_config
+from mailman_pgp.database import transaction
+from mailman_pgp.model.address import PGPAddress
+from mailman_pgp.model.list import PGPMailingList
+from mailman_pgp.pgp.mime import MIMEWrapper
+from mailman_pgp.pgp.tests.base import load_key
+from mailman_pgp.pgp.wrapper import PGPWrapper
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.workflows.base import CONFIRM_REQUEST
+from mailman_pgp.workflows.subscription import OpenSubscriptionPolicy
+
+
+class TestPreSubscription(unittest.TestCase):
+ layer = PGPConfigLayer
+
+ def setUp(self):
+ self.mlist = create_list('test@example.com', style_name='pgp-default')
+ self.pgp_list = PGPMailingList.for_list(self.mlist)
+ self.pgp_list.generate_key(True)
+
+ def test_set(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart)
+
+ get_queue_messages('virgin')
+
+ bart_key = load_key('rsa_1024.priv.asc')
+
+ set_message = Message()
+ set_message['From'] = 'bart@example.com'
+ set_message['To'] = 'test@example.com'
+ set_message['Subject'] = 'Re: key set {}'.format(token)
+ set_message.set_type('multipart/mixed')
+ wrapped_set_message = MIMEWrapper(set_message)
+ set_message = wrapped_set_message.attach_key(bart_key.pubkey)
+
+ mm_config.switchboards['command'].enqueue(set_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ pgp_address = PGPAddress.for_address(bart)
+ self.assertIsNotNone(pgp_address)
+ self.assertEqual(pgp_address.key.fingerprint, bart_key.fingerprint)
+ self.assertEqual(pgp_address.key_fingerprint, bart_key.fingerprint)
+ self.assertFalse(pgp_address.key_confirmed)
+
+ items = get_queue_messages('virgin', expected_count=2)
+ if items[0].msg['Subject'] == 'The results of your email commands':
+ confirm_request = items[1].msg
+ else:
+ confirm_request = items[0].msg
+
+ confirm_wrapped = PGPWrapper(confirm_request)
+ self.assertTrue(confirm_wrapped.is_encrypted())
+
+ def test_confirm(self):
+ self.mlist.subscription_policy = OpenSubscriptionPolicy
+ bart = getUtility(IUserManager).create_address('bart@example.com',
+ 'Bart Person')
+ bart.verified_on = now()
+
+ bart_key = load_key('rsa_1024.priv.asc')
+
+ token, token_owner, member = ISubscriptionManager(self.mlist).register(
+ bart, pubkey=bart_key.pubkey)
+
+ get_queue_messages('virgin')
+
+ with transaction() as t:
+ pgp_address = PGPAddress(bart)
+ pgp_address.key =bart_key.pubkey
+ t.add(pgp_address)
+
+ confirm_message = Message()
+ confirm_message['From'] = 'bart@example.com'
+ confirm_message['To'] = 'test@example.com'
+ confirm_message['Subject'] = 'Re: key confirm {}'.format(token)
+ confirm_message.set_payload(
+ CONFIRM_REQUEST.format(bart_key.fingerprint, token))
+ wrapped_confirm_message = MIMEWrapper(confirm_message)
+ confirm_message = wrapped_confirm_message.sign(bart_key)
+
+ mm_config.switchboards['command'].enqueue(confirm_message,
+ listid='test.example.com')
+ make_testable_runner(CommandRunner, 'command').run()
+
+ pgp_address = PGPAddress.for_address(bart)
+ self.assertTrue(pgp_address.key_confirmed)
+ self.assertTrue(self.mlist.is_subscribed(bart))
diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py
index 1d07486..6fc3050 100644
--- a/src/mailman_pgp/model/address.py
+++ b/src/mailman_pgp/model/address.py
@@ -22,7 +22,7 @@ from os.path import exists, isfile, join
from mailman.database.types import SAUnicode
from mailman.interfaces.usermanager import IUserManager
from pgpy import PGPKey
-from sqlalchemy import Column, Integer
+from sqlalchemy import Column, Integer, Boolean
from sqlalchemy.orm import reconstructor
from zope.component import getUtility
@@ -38,6 +38,7 @@ class PGPAddress(Base):
id = Column(Integer, primary_key=True)
email = Column(SAUnicode, index=True)
key_fingerprint = Column(SAUnicode)
+ key_confirmed = Column(Boolean)
def __init__(self, address):
super().__init__()
@@ -49,6 +50,7 @@ class PGPAddress(Base):
def _init(self):
self._address = None
self._key = None
+ self.key_confirmed = False
@property
def key(self):
@@ -119,3 +121,9 @@ class PGPAddress(Base):
if address is None:
return None
return PGPAddress.query().filter_by(email=address.email).first()
+
+ @staticmethod
+ def for_email(email):
+ if email is None:
+ return None
+ return PGPAddress.query().filter_by(email=email).first()
diff --git a/src/mailman_pgp/pgp/mime.py b/src/mailman_pgp/pgp/mime.py
index 1ecbe76..975b8bd 100644
--- a/src/mailman_pgp/pgp/mime.py
+++ b/src/mailman_pgp/pgp/mime.py
@@ -87,7 +87,7 @@ class MIMEWrapper:
return self.is_signed()
def get_signed(self):
- yield self.msg.get_payload(0).as_string(0)
+ yield self.msg.get_payload(0).as_string()
def is_encrypted(self):
"""
@@ -145,6 +145,26 @@ class MIMEWrapper:
key, _ = PGPKey.from_blob(part.get_payload())
yield key
+ def attach_key(self, key):
+ """
+
+ :param key:
+ :type key: pgpy.PGPKey
+ :return:
+ """
+ filename = '0x' + key.fingerprint.keyid + '.asc'
+ key_part = MIMEApplication(_data=str(key),
+ _subtype=MIMEWrapper._keys_subtype,
+ _encoder=encode_7or8bit,
+ name=filename)
+ key_part.add_header('Content-Description',
+ 'OpenPGP key')
+ key_part.add_header('Content-Disposition', 'attachment',
+ filename=filename)
+ out = copy.deepcopy(self.msg)
+ out.attach(key_part)
+ return out
+
def verify(self, key):
"""
Verify the signature of this message with key.