aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJ08nY2017-07-31 22:02:36 +0200
committerJ08nY2017-07-31 22:02:36 +0200
commitb16fe8644ddc82584a8ff9a2f98e40c1571437f2 (patch)
tree170f1c1eeeff3c3773a369202f7dad53760e9eb9
parent4f815c200f62cf4e4bf660649066d9bd65393ae0 (diff)
downloadmailman-pgp-b16fe8644ddc82584a8ff9a2f98e40c1571437f2.tar.gz
mailman-pgp-b16fe8644ddc82584a8ff9a2f98e40c1571437f2.tar.zst
mailman-pgp-b16fe8644ddc82584a8ff9a2f98e40c1571437f2.zip
-rw-r--r--src/mailman_pgp/database/__init__.py4
-rw-r--r--src/mailman_pgp/model/address.py2
-rw-r--r--src/mailman_pgp/model/db_key.py14
-rw-r--r--src/mailman_pgp/model/fs_key.py4
-rw-r--r--src/mailman_pgp/model/sighash.py2
-rw-r--r--src/mailman_pgp/model/tests/test_db_key.py50
-rw-r--r--src/mailman_pgp/model/tests/test_fs_key.py99
7 files changed, 167 insertions, 8 deletions
diff --git a/src/mailman_pgp/database/__init__.py b/src/mailman_pgp/database/__init__.py
index 2e24cdc..11ea387 100644
--- a/src/mailman_pgp/database/__init__.py
+++ b/src/mailman_pgp/database/__init__.py
@@ -38,7 +38,7 @@ class Database:
url = config.get('db', 'url')
self._url = expand(url, None, mailman_config.paths)
self.engine = create_engine(self._url)
- self._scoped_session = scoped_session(sessionmaker(bind=self.engine))
+ self.scoped_session = scoped_session(sessionmaker(bind=self.engine))
Base.metadata.create_all(self.engine)
self.session.commit()
@@ -50,7 +50,7 @@ class Database:
:return: A scoped session.
:rtype: scoped_session
"""
- return self._scoped_session()
+ return self.scoped_session()
@public
diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py
index a912e87..9ecfa95 100644
--- a/src/mailman_pgp/model/address.py
+++ b/src/mailman_pgp/model/address.py
@@ -19,6 +19,7 @@
from mailman.database.types import SAUnicode
from mailman.interfaces.usermanager import IUserManager
+from public import public
from sqlalchemy import Boolean, Column, Integer, String
from sqlalchemy.orm import reconstructor
from zope.component import getUtility
@@ -28,6 +29,7 @@ from mailman_pgp.model.base import Base
from mailman_pgp.model.fs_key import FSKey
+@public
class PGPAddress(Base):
"""A PGP enabled address."""
diff --git a/src/mailman_pgp/model/db_key.py b/src/mailman_pgp/model/db_key.py
index 45eafc7..0b3af45 100644
--- a/src/mailman_pgp/model/db_key.py
+++ b/src/mailman_pgp/model/db_key.py
@@ -16,6 +16,7 @@
# this program. If not, see <http://www.gnu.org/licenses/>.
"""Database stored PGP key."""
+from public import public
from sqlalchemy import Column, Integer, LargeBinary
from sqlalchemy.orm import reconstructor
@@ -23,15 +24,18 @@ from mailman_pgp.model.base import Base
from mailman_pgp.utils.pgp import key_from_blob
+@public
class DBKey(Base):
"""Database stored PGP key."""
__tablename__ = 'keys'
id = Column(Integer, primary_key=True)
- key_material = Column(LargeBinary)
+ _key_material = Column('key_material', LargeBinary)
- def __init__(self):
+ def __init__(self, key=None):
super().__init__()
+ self._init()
+ self.key = key
@reconstructor
def _init(self):
@@ -40,14 +44,14 @@ class DBKey(Base):
@property
def key(self):
if self._key is None:
- self._key = key_from_blob(self.key_material)
+ self._key = key_from_blob(self._key_material)
return self._key
@key.setter
def key(self, value):
if value is None:
self._key = None
- self.key_material = bytes()
+ self._key_material = bytes()
else:
self._key = value
- self.key_material = bytes(value)
+ self._key_material = bytes(value)
diff --git a/src/mailman_pgp/model/fs_key.py b/src/mailman_pgp/model/fs_key.py
index c7c86eb..c4a2ac4 100644
--- a/src/mailman_pgp/model/fs_key.py
+++ b/src/mailman_pgp/model/fs_key.py
@@ -19,10 +19,12 @@
from os import remove
from os.path import getmtime, join
+from public import public
+
from mailman_pgp.utils.file import locked_obj
from mailman_pgp.utils.pgp import key_from_file
-
+@public
class FSKey:
"""Filesystem stored PGP key."""
diff --git a/src/mailman_pgp/model/sighash.py b/src/mailman_pgp/model/sighash.py
index 9803ffc..d936f35 100644
--- a/src/mailman_pgp/model/sighash.py
+++ b/src/mailman_pgp/model/sighash.py
@@ -19,8 +19,10 @@
from sqlalchemy import Column, LargeBinary, String
from mailman_pgp.model.base import Base
+from public import public
+@public
class PGPSigHash(Base):
""""""
diff --git a/src/mailman_pgp/model/tests/test_db_key.py b/src/mailman_pgp/model/tests/test_db_key.py
new file mode 100644
index 0000000..46cc487
--- /dev/null
+++ b/src/mailman_pgp/model/tests/test_db_key.py
@@ -0,0 +1,50 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+from unittest import TestCase
+
+from mailman_pgp.config import config
+from mailman_pgp.database import transaction
+from mailman_pgp.model.db_key import DBKey
+from mailman_pgp.testing.layers import PGPConfigLayer
+from mailman_pgp.testing.pgp import load_key
+
+
+class TestDBKey(TestCase):
+ layer = PGPConfigLayer
+
+ def test_key(self):
+ with transaction() as t:
+ db_key = DBKey()
+ t.add(db_key)
+
+ key_data = load_key('rsa_1024.priv.asc')
+ with transaction():
+ db_key.key = key_data
+ self.assertEqual(db_key.key.fingerprint, key_data.fingerprint)
+
+ def test_loaded(self):
+ key_data = load_key('rsa_1024.priv.asc')
+ with transaction() as t:
+ db_key = DBKey(key_data)
+ t.add(db_key)
+
+ config.db.scoped_session.remove()
+
+ loaded_key = DBKey.query().one()
+ self.assertEqual(loaded_key.key.fingerprint, key_data.fingerprint)
diff --git a/src/mailman_pgp/model/tests/test_fs_key.py b/src/mailman_pgp/model/tests/test_fs_key.py
new file mode 100644
index 0000000..8828e4b
--- /dev/null
+++ b/src/mailman_pgp/model/tests/test_fs_key.py
@@ -0,0 +1,99 @@
+# Copyright (C) 2017 Jan Jancar
+#
+# This file is a part of the Mailman PGP plugin.
+#
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+""""""
+from os.path import join, exists
+from tempfile import TemporaryDirectory
+from unittest import TestCase
+
+from mailman_pgp.model.fs_key import FSKey
+from mailman_pgp.testing.layers import PGPLayer
+from mailman_pgp.testing.pgp import load_key
+
+
+class TestFSKey(TestCase):
+ layer = PGPLayer
+
+ def setUp(self):
+ self.tmpdir = TemporaryDirectory()
+
+ def tearDown(self):
+ self.tmpdir.cleanup()
+
+ def test_load(self):
+ key_name = 'something.asc'
+ key_path = join(self.tmpdir.name, key_name)
+ key_data = load_key('rsa_1024.priv.asc')
+ with open(key_path, 'w') as key_file:
+ key_file.write(str(key_data))
+
+ key = FSKey(self.tmpdir.name, key_name, False)
+ self.assertEqual(key.key_path, key_path)
+
+ key.load()
+ self.assertEqual(key.key.fingerprint, key_data.fingerprint)
+
+ def test_reload_none(self):
+ key_name = 'something.asc'
+ key = FSKey(self.tmpdir.name, key_name, False)
+ key_data = load_key('rsa_1024.priv.asc')
+ with open(key.key_path, 'w') as key_file:
+ key_file.write(str(key_data))
+
+ self.assertIsNone(key.key)
+ key.reload()
+ self.assertIsNotNone(key.key)
+ self.assertEqual(key.key.fingerprint, key_data.fingerprint)
+
+ def test_reload_not_none(self):
+ key_name = 'something.asc'
+ key_path = join(self.tmpdir.name, key_name)
+ key_data = load_key('rsa_1024.priv.asc')
+ new_key_data = load_key('ecc_p256.priv.asc')
+ with open(key_path, 'w') as key_file:
+ key_file.write(str(key_data))
+
+ key = FSKey(self.tmpdir.name, key_name, True)
+
+ with open(key_path, 'w') as key_file:
+ key_file.write(str(new_key_data))
+
+ key.reload()
+ self.assertIsNotNone(key.key)
+ self.assertEqual(key.key.fingerprint, new_key_data.fingerprint)
+
+ def test_save(self):
+ key_name = 'something.asc'
+ key = FSKey(self.tmpdir.name, key_name)
+ key_data = load_key('rsa_1024.priv.asc')
+
+ key.key = key_data
+ key.save()
+ self.assertTrue(exists(key.key_path))
+
+ def test_delete(self):
+ key_name = 'something.asc'
+ key_path = join(self.tmpdir.name, key_name)
+ key_data = load_key('rsa_1024.priv.asc')
+ with open(key_path, 'w') as key_file:
+ key_file.write(str(key_data))
+
+ key = FSKey(self.tmpdir.name, key_name, True)
+
+ key.delete()
+ self.assertFalse(exists(key.key_path))
+ self.assertIsNotNone(key.key)