aboutsummaryrefslogtreecommitdiff
path: root/src/mailman_pgp/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/mailman_pgp/model')
-rw-r--r--src/mailman_pgp/model/address.py46
-rw-r--r--src/mailman_pgp/model/db_key.py57
-rw-r--r--src/mailman_pgp/model/fs_key.py113
-rw-r--r--src/mailman_pgp/model/list.py74
-rw-r--r--src/mailman_pgp/model/sighash.py2
-rw-r--r--src/mailman_pgp/model/tests/test_db_key.py50
-rw-r--r--src/mailman_pgp/model/tests/test_fs_key.py120
-rw-r--r--src/mailman_pgp/model/tests/test_list.py17
8 files changed, 409 insertions, 70 deletions
diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py
index 0cfefe2..9ecfa95 100644
--- a/src/mailman_pgp/model/address.py
+++ b/src/mailman_pgp/model/address.py
@@ -16,20 +16,20 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
"""Model for PGP enabled addresses."""
-import os
-from os.path import exists, isfile, join
from mailman.database.types import SAUnicode
from mailman.interfaces.usermanager import IUserManager
-from pgpy import PGPKey
+from public import public
from sqlalchemy import Boolean, Column, Integer, String
from sqlalchemy.orm import reconstructor
from zope.component import getUtility
from mailman_pgp.config import config
from mailman_pgp.model.base import Base
+from mailman_pgp.model.fs_key import FSKey
+@public
class PGPAddress(Base):
"""A PGP enabled address."""
@@ -41,15 +41,15 @@ class PGPAddress(Base):
key_confirmed = Column(Boolean, default=False)
def __init__(self, address):
- super().__init__()
+ super().__init__(email=address.email)
self._init()
- self.email = address.email
self._address = address
@reconstructor
def _init(self):
self._address = None
- self._key = None
+ self._key = FSKey(config.pgp.keydir_config['user_keydir'],
+ self.email + '.asc', True)
@property
def key(self):
@@ -58,33 +58,22 @@ class PGPAddress(Base):
:return:
:rtype: pgpy.PGPKey
"""
- if self.key_fingerprint is None:
- return None
- if self._key is None:
- if exists(self.key_path) and isfile(self.key_path):
- self._key, _ = PGPKey.from_file(self.key_path)
- return self._key
+ self._key.reload()
+ return self._key.key
@key.setter
- def key(self, new_key):
+ def key(self, value):
"""
- :param new_key:
- :type new_key: PGPKey
+ :param value:
+ :type value: pgpy.PGPKey
"""
- if self.key_fingerprint is not None:
- try:
- os.remove(self.key_path)
- except FileNotFoundError:
- pass
- if new_key is None:
+ if value is None:
self.key_fingerprint = None
- self._key = None
else:
- self.key_fingerprint = str(new_key.fingerprint)
- with open(self.key_path, 'w') as out:
- out.write(str(new_key))
- self._key = new_key
+ self.key_fingerprint = value.fingerprint
+ self._key.key = value
+ self._key.save()
@property
def key_path(self):
@@ -93,10 +82,7 @@ class PGPAddress(Base):
:return:
:rtype: str
"""
- if self.key_fingerprint is None:
- return None
- return join(config.pgp.keydir_config['user_keydir'],
- self.key_fingerprint + '.asc')
+ return self._key.key_path
@property
def address(self):
diff --git a/src/mailman_pgp/model/db_key.py b/src/mailman_pgp/model/db_key.py
new file mode 100644
index 0000000..0b3af45
--- /dev/null
+++ b/src/mailman_pgp/model/db_key.py
@@ -0,0 +1,57 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""Database stored PGP key."""
+from public import public
+from sqlalchemy import Column, Integer, LargeBinary
+from sqlalchemy.orm import reconstructor
+
+from mailman_pgp.model.base import Base
+from mailman_pgp.utils.pgp import key_from_blob
+
+
+@public
+class DBKey(Base):
+ """Database stored PGP key."""
+ __tablename__ = 'keys'
+
+ id = Column(Integer, primary_key=True)
+ _key_material = Column('key_material', LargeBinary)
+
+ def __init__(self, key=None):
+ super().__init__()
+ self._init()
+ self.key = key
+
+ @reconstructor
+ def _init(self):
+ self._key = None
+
+ @property
+ def key(self):
+ if self._key is None:
+ self._key = key_from_blob(self._key_material)
+ return self._key
+
+ @key.setter
+ def key(self, value):
+ if value is None:
+ self._key = None
+ self._key_material = bytes()
+ else:
+ self._key = value
+ self._key_material = bytes(value)
diff --git a/src/mailman_pgp/model/fs_key.py b/src/mailman_pgp/model/fs_key.py
new file mode 100644
index 0000000..118f971
--- /dev/null
+++ b/src/mailman_pgp/model/fs_key.py
@@ -0,0 +1,113 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""Filesystem stored PGP key."""
+from os import remove, urandom
+from os.path import getmtime, getsize, join
+
+from public import public
+
+from mailman_pgp.utils.file import locked_obj
+from mailman_pgp.utils.pgp import key_from_file
+
+
+@public
+class FSKey:
+ """Filesystem stored PGP key."""
+
+ def __init__(self, keydir, keyfile, load=False):
+ self._key = None
+ self._mtime = None
+ self.keydir = keydir
+ self.keyfile = keyfile
+ if load:
+ self.load()
+
+ @property
+ def key(self):
+ """
+
+ :rtype: pgpy.PGPKey
+ """
+ return self._key
+
+ @key.setter
+ def key(self, value):
+ """
+
+ :param value:
+ :type value: pgpy.PGPKey
+ """
+ self._key = value
+
+ @property
+ def key_path(self):
+ return join(self.keydir, self.keyfile)
+
+ @property
+ def lock_path(self):
+ return self.key_path + '.lock'
+
+ def _load(self):
+ try:
+ self.key = key_from_file(self.key_path)
+ self._mtime = getmtime(self.key_path)
+ except FileNotFoundError:
+ self.key = None
+ self._mtime = None
+
+ @locked_obj('lock_path')
+ def load(self):
+ self._load()
+
+ @locked_obj('lock_path')
+ def reload(self):
+ if self.key is None:
+ self._load()
+ else:
+ mtime = getmtime(self.key_path)
+ if self._mtime is None or mtime > self._mtime:
+ self._load()
+
+ @locked_obj('lock_path')
+ def save(self):
+ if self.key is None:
+ remove(self.key_path)
+ self._mtime = None
+ else:
+ with open(self.key_path, 'w') as key_file:
+ key_file.write(str(self.key))
+ self._mtime = getmtime(self.key_path)
+
+ @locked_obj('lock_path')
+ def delete(self):
+ try:
+ remove(self.key_path)
+ except FileNotFoundError:
+ pass
+
+ @locked_obj('lock_path')
+ def shred(self):
+ try:
+ size = getsize(self.key_path)
+ for _ in range(10):
+ with open(self.key_path, 'wb') as f:
+ data = urandom(size)
+ f.write(data)
+ remove(self.key_path)
+ except FileNotFoundError:
+ pass
diff --git a/src/mailman_pgp/model/list.py b/src/mailman_pgp/model/list.py
index 8448368..946af34 100644
--- a/src/mailman_pgp/model/list.py
+++ b/src/mailman_pgp/model/list.py
@@ -16,15 +16,10 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
"""Model for PGP enabled mailing lists."""
-
-from os import remove
-from os.path import exists, isfile, join
-
-from flufl.lock import Lock
+from lazr.config import as_boolean
from mailman.database.types import Enum, SAUnicode
from mailman.interfaces.action import Action
from mailman.interfaces.listmanager import (IListManager, ListDeletingEvent)
-from pgpy import PGPKey
from public import public
from sqlalchemy import Boolean, Column, Integer
from sqlalchemy.orm import reconstructor
@@ -34,7 +29,7 @@ from zope.event import classhandler
from mailman_pgp.config import config
from mailman_pgp.database import transaction
from mailman_pgp.model.base import Base
-from mailman_pgp.pgp.keygen import ListKeyGenerator
+from mailman_pgp.model.fs_key import FSKey
@public
@@ -44,7 +39,7 @@ class PGPMailingList(Base):
__tablename__ = 'pgp_lists'
id = Column(Integer, primary_key=True)
- list_id = Column(SAUnicode, index=True)
+ list_id = Column(SAUnicode, index=True, unique=True)
# Signature related properties
unsigned_msg_action = Column(Enum(Action), default=Action.reject)
@@ -61,16 +56,20 @@ class PGPMailingList(Base):
encrypt_outgoing = Column(Boolean, default=True)
def __init__(self, mlist):
- super().__init__()
+ """
+
+ :param mlist:
+ :type mlist: mailman.model.mailinglist.MailingList
+ """
+ super().__init__(list_id=mlist.list_id)
self._init()
- self.list_id = mlist.list_id
self._mlist = mlist
@reconstructor
def _init(self):
self._mlist = None
- self._key = None
- self._key_generator = None
+ self._key = FSKey(config.pgp.keydir_config['list_keydir'],
+ self.list_id + '.asc', True)
@property
def mlist(self):
@@ -84,44 +83,34 @@ class PGPMailingList(Base):
return self._mlist
@property
+ def fs_key(self):
+ return self._key
+
+ @property
def key(self):
"""
+ The private part of the list's keypair.
:return:
:rtype: pgpy.PGPKey
"""
- if self._key is None:
- # Check the file
- if exists(self.key_path) and isfile(self.key_path):
- self._key, _ = PGPKey.from_file(self.key_path)
- return self._key
+ self._key.reload()
+ return self._key.key
@key.setter
def key(self, value):
- with Lock(self.key_path + '.lock'):
- self._key = value
- if value is None:
- remove(self.key_path)
- else:
- with open(self.key_path, 'w') as key_file:
- key_file.write(str(value))
+ """
- def generate_key(self, block=False):
- self._key = None
- self._key_generator = ListKeyGenerator(config.pgp.primary_key_args,
- config.pgp.sub_key_args,
- self.mlist.display_name,
- self.mlist.posting_address,
- self.mlist.request_address,
- self.key_path)
- self._key_generator.start()
- if block:
- self._key_generator.join()
- return self.key
+ :param value:
+ :type value:
+ """
+ self._key.key = value
+ self._key.save()
@property
def pubkey(self):
"""
+ The public part of the list's keypair.
:return:
:rtype: pgpy.PGPKey
@@ -133,18 +122,19 @@ class PGPMailingList(Base):
@property
def key_path(self):
"""
+ The path to this list's key in the `list_keydir`.
- :return:
+ :return: List key path.
:rtype: str
"""
- return join(config.pgp.keydir_config['list_keydir'],
- self.list_id + '.asc')
+ return self._key.key_path
@staticmethod
def for_list(mlist):
"""
:param mlist:
+ :type mlist: mailman.model.mailinglist.MailingList
:return:
:rtype: PGPMailingList|None
"""
@@ -155,8 +145,12 @@ class PGPMailingList(Base):
@classhandler.handler(ListDeletingEvent)
def on_delete(event):
+ shred = as_boolean(config.get('keypairs', 'shred'))
pgp_list = PGPMailingList.for_list(event.mailing_list)
if pgp_list:
with transaction() as session:
- # TODO shred the list key
+ if shred:
+ pgp_list.fs_key.shred()
+ else:
+ pgp_list.fs_key.delete()
session.delete(pgp_list)
diff --git a/src/mailman_pgp/model/sighash.py b/src/mailman_pgp/model/sighash.py
index 9803ffc..e8a7ca9 100644
--- a/src/mailman_pgp/model/sighash.py
+++ b/src/mailman_pgp/model/sighash.py
@@ -16,11 +16,13 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
""""""
+from public import public
from sqlalchemy import Column, LargeBinary, String
from mailman_pgp.model.base import Base
+@public
class PGPSigHash(Base):
""""""
diff --git a/src/mailman_pgp/model/tests/test_db_key.py b/src/mailman_pgp/model/tests/test_db_key.py
new file mode 100644
index 0000000..46cc487
--- /dev/null
+++ b/src/mailman_pgp/model/tests/test_db_key.py
@@ -0,0 +1,50 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+from unittest import TestCase
+
+from mailman_pgp.config import config
+from mailman_pgp.database import transaction
+from mailman_pgp.model.db_key import DBKey
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.testing.pgp import load_key
+
+
+class TestDBKey(TestCase):
+ layer = PGPConfigLayer
+
+ def test_key(self):
+ with transaction() as t:
+ db_key = DBKey()
+ t.add(db_key)
+
+ key_data = load_key('rsa_1024.priv.asc')
+ with transaction():
+ db_key.key = key_data
+ self.assertEqual(db_key.key.fingerprint, key_data.fingerprint)
+
+ def test_loaded(self):
+ key_data = load_key('rsa_1024.priv.asc')
+ with transaction() as t:
+ db_key = DBKey(key_data)
+ t.add(db_key)
+
+ config.db.scoped_session.remove()
+
+ loaded_key = DBKey.query().one()
+ self.assertEqual(loaded_key.key.fingerprint, key_data.fingerprint)
diff --git a/src/mailman_pgp/model/tests/test_fs_key.py b/src/mailman_pgp/model/tests/test_fs_key.py
new file mode 100644
index 0000000..3a4f859
--- /dev/null
+++ b/src/mailman_pgp/model/tests/test_fs_key.py
@@ -0,0 +1,120 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+from os.path import exists, join
+from tempfile import TemporaryDirectory
+from unittest import TestCase
+
+from mailman_pgp.model.fs_key import FSKey
+from mailman_pgp.testing.layers import PGPLayer
+from mailman_pgp.testing.pgp import load_key
+
+
+class TestFSKey(TestCase):
+ layer = PGPLayer
+
+ def setUp(self):
+ self.tmpdir = TemporaryDirectory()
+
+ def tearDown(self):
+ self.tmpdir.cleanup()
+
+ def test_load(self):
+ key_name = 'something.asc'
+ key_path = join(self.tmpdir.name, key_name)
+ key_data = load_key('rsa_1024.priv.asc')
+ with open(key_path, 'w') as key_file:
+ key_file.write(str(key_data))
+
+ key = FSKey(self.tmpdir.name, key_name, False)
+ self.assertEqual(key.key_path, key_path)
+
+ key.load()
+ self.assertEqual(key.key.fingerprint, key_data.fingerprint)
+
+ def test_reload(self):
+ key_name = 'something.asc'
+ key_path = join(self.tmpdir.name, key_name)
+ key_data = load_key('rsa_1024.priv.asc')
+ new_key_data = load_key('ecc_p256.priv.asc')
+ with open(key_path, 'w') as key_file:
+ key_file.write(str(key_data))
+
+ key = FSKey(self.tmpdir.name, key_name, True)
+
+ with open(key_path, 'w') as key_file:
+ key_file.write(str(new_key_data))
+
+ key.reload()
+ self.assertIsNotNone(key.key)
+ self.assertEqual(key.key.fingerprint, new_key_data.fingerprint)
+
+ def test_reload_none(self):
+ key_name = 'something.asc'
+ key = FSKey(self.tmpdir.name, key_name, False)
+ key_data = load_key('rsa_1024.priv.asc')
+ with open(key.key_path, 'w') as key_file:
+ key_file.write(str(key_data))
+
+ self.assertIsNone(key.key)
+ key.reload()
+ self.assertIsNotNone(key.key)
+ self.assertEqual(key.key.fingerprint, key_data.fingerprint)
+
+ def test_save(self):
+ key_name = 'something.asc'
+ key = FSKey(self.tmpdir.name, key_name)
+ key_data = load_key('rsa_1024.priv.asc')
+
+ key.key = key_data
+ key.save()
+ self.assertTrue(exists(key.key_path))
+
+ def test_delete(self):
+ key_name = 'something.asc'
+ key_path = join(self.tmpdir.name, key_name)
+ key_data = load_key('rsa_1024.priv.asc')
+ with open(key_path, 'w') as key_file:
+ key_file.write(str(key_data))
+
+ key = FSKey(self.tmpdir.name, key_name, True)
+
+ key.delete()
+ self.assertFalse(exists(key.key_path))
+ self.assertIsNotNone(key.key)
+
+ def test_delete_none(self):
+ key = FSKey(self.tmpdir.name, 'something.asc')
+ key.delete()
+
+ def test_shred(self):
+ key_name = 'something.asc'
+ key_path = join(self.tmpdir.name, key_name)
+ key_data = load_key('rsa_1024.priv.asc')
+ with open(key_path, 'w') as key_file:
+ key_file.write(str(key_data))
+
+ key = FSKey(self.tmpdir.name, key_name, True)
+
+ key.shred()
+ self.assertFalse(exists(key.key_path))
+ self.assertIsNotNone(key.key)
+
+ def test_shred_none(self):
+ key = FSKey(self.tmpdir.name, 'something.asc')
+ key.shred()
diff --git a/src/mailman_pgp/model/tests/test_list.py b/src/mailman_pgp/model/tests/test_list.py
index 58f52f2..aa07a79 100644
--- a/src/mailman_pgp/model/tests/test_list.py
+++ b/src/mailman_pgp/model/tests/test_list.py
@@ -16,15 +16,18 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
""""""
+from os.path import exists
from unittest import TestCase
from mailman.app.lifecycle import create_list
from mailman.interfaces.listmanager import IListManager
from zope.component import getUtility
+from mailman_pgp.config import config
from mailman_pgp.database import mm_transaction
from mailman_pgp.model.list import PGPMailingList
from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.testing.pgp import load_key
class TestPGPMailingList(TestCase):
@@ -34,8 +37,22 @@ class TestPGPMailingList(TestCase):
with mm_transaction():
self.mlist = create_list('test@example.com',
style_name='pgp-default')
+ pgp_list = PGPMailingList.for_list(self.mlist)
+ pgp_list.key = load_key('rsa_1024.priv.asc')
def test_delete(self):
getUtility(IListManager).delete(self.mlist)
pgp_list = PGPMailingList.for_list(self.mlist)
self.assertIsNone(pgp_list)
+
+ def test_shred_key(self):
+ key_path = PGPMailingList.for_list(self.mlist).key_path
+ getUtility(IListManager).delete(self.mlist)
+ self.assertFalse(exists(key_path))
+
+ def test_delete_key(self):
+ self.addCleanup(config.set, 'keypairs', 'shred', 'yes')
+ config.set('keypairs', 'shred', 'no')
+ key_path = PGPMailingList.for_list(self.mlist).key_path
+ getUtility(IListManager).delete(self.mlist)
+ self.assertFalse(exists(key_path))