diff options
| author | J08nY | 2017-07-31 22:02:36 +0200 |
|---|---|---|
| committer | J08nY | 2017-07-31 22:02:36 +0200 |
| commit | b16fe8644ddc82584a8ff9a2f98e40c1571437f2 (patch) | |
| tree | 170f1c1eeeff3c3773a369202f7dad53760e9eb9 | |
| parent | 4f815c200f62cf4e4bf660649066d9bd65393ae0 (diff) | |
| download | mailman-pgp-b16fe8644ddc82584a8ff9a2f98e40c1571437f2.tar.gz mailman-pgp-b16fe8644ddc82584a8ff9a2f98e40c1571437f2.tar.zst mailman-pgp-b16fe8644ddc82584a8ff9a2f98e40c1571437f2.zip | |
| -rw-r--r-- | src/mailman_pgp/database/__init__.py | 4 | ||||
| -rw-r--r-- | src/mailman_pgp/model/address.py | 2 | ||||
| -rw-r--r-- | src/mailman_pgp/model/db_key.py | 14 | ||||
| -rw-r--r-- | src/mailman_pgp/model/fs_key.py | 4 | ||||
| -rw-r--r-- | src/mailman_pgp/model/sighash.py | 2 | ||||
| -rw-r--r-- | src/mailman_pgp/model/tests/test_db_key.py | 50 | ||||
| -rw-r--r-- | src/mailman_pgp/model/tests/test_fs_key.py | 99 |
7 files changed, 167 insertions, 8 deletions
diff --git a/src/mailman_pgp/database/__init__.py b/src/mailman_pgp/database/__init__.py index 2e24cdc..11ea387 100644 --- a/src/mailman_pgp/database/__init__.py +++ b/src/mailman_pgp/database/__init__.py @@ -38,7 +38,7 @@ class Database: url = config.get('db', 'url') self._url = expand(url, None, mailman_config.paths) self.engine = create_engine(self._url) - self._scoped_session = scoped_session(sessionmaker(bind=self.engine)) + self.scoped_session = scoped_session(sessionmaker(bind=self.engine)) Base.metadata.create_all(self.engine) self.session.commit() @@ -50,7 +50,7 @@ class Database: :return: A scoped session. :rtype: scoped_session """ - return self._scoped_session() + return self.scoped_session() @public diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py index a912e87..9ecfa95 100644 --- a/src/mailman_pgp/model/address.py +++ b/src/mailman_pgp/model/address.py @@ -19,6 +19,7 @@ from mailman.database.types import SAUnicode from mailman.interfaces.usermanager import IUserManager +from public import public from sqlalchemy import Boolean, Column, Integer, String from sqlalchemy.orm import reconstructor from zope.component import getUtility @@ -28,6 +29,7 @@ from mailman_pgp.model.base import Base from mailman_pgp.model.fs_key import FSKey +@public class PGPAddress(Base): """A PGP enabled address.""" diff --git a/src/mailman_pgp/model/db_key.py b/src/mailman_pgp/model/db_key.py index 45eafc7..0b3af45 100644 --- a/src/mailman_pgp/model/db_key.py +++ b/src/mailman_pgp/model/db_key.py @@ -16,6 +16,7 @@ # this program. If not, see <http://www.gnu.org/licenses/>. """Database stored PGP key.""" +from public import public from sqlalchemy import Column, Integer, LargeBinary from sqlalchemy.orm import reconstructor @@ -23,15 +24,18 @@ from mailman_pgp.model.base import Base from mailman_pgp.utils.pgp import key_from_blob +@public class DBKey(Base): """Database stored PGP key.""" __tablename__ = 'keys' id = Column(Integer, primary_key=True) - key_material = Column(LargeBinary) + _key_material = Column('key_material', LargeBinary) - def __init__(self): + def __init__(self, key=None): super().__init__() + self._init() + self.key = key @reconstructor def _init(self): @@ -40,14 +44,14 @@ class DBKey(Base): @property def key(self): if self._key is None: - self._key = key_from_blob(self.key_material) + self._key = key_from_blob(self._key_material) return self._key @key.setter def key(self, value): if value is None: self._key = None - self.key_material = bytes() + self._key_material = bytes() else: self._key = value - self.key_material = bytes(value) + self._key_material = bytes(value) diff --git a/src/mailman_pgp/model/fs_key.py b/src/mailman_pgp/model/fs_key.py index c7c86eb..c4a2ac4 100644 --- a/src/mailman_pgp/model/fs_key.py +++ b/src/mailman_pgp/model/fs_key.py @@ -19,10 +19,12 @@ from os import remove from os.path import getmtime, join +from public import public + from mailman_pgp.utils.file import locked_obj from mailman_pgp.utils.pgp import key_from_file - +@public class FSKey: """Filesystem stored PGP key.""" diff --git a/src/mailman_pgp/model/sighash.py b/src/mailman_pgp/model/sighash.py index 9803ffc..d936f35 100644 --- a/src/mailman_pgp/model/sighash.py +++ b/src/mailman_pgp/model/sighash.py @@ -19,8 +19,10 @@ from sqlalchemy import Column, LargeBinary, String from mailman_pgp.model.base import Base +from public import public +@public class PGPSigHash(Base): """""" diff --git a/src/mailman_pgp/model/tests/test_db_key.py b/src/mailman_pgp/model/tests/test_db_key.py new file mode 100644 index 0000000..46cc487 --- /dev/null +++ b/src/mailman_pgp/model/tests/test_db_key.py @@ -0,0 +1,50 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +from unittest import TestCase + +from mailman_pgp.config import config +from mailman_pgp.database import transaction +from mailman_pgp.model.db_key import DBKey +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.testing.pgp import load_key + + +class TestDBKey(TestCase): + layer = PGPConfigLayer + + def test_key(self): + with transaction() as t: + db_key = DBKey() + t.add(db_key) + + key_data = load_key('rsa_1024.priv.asc') + with transaction(): + db_key.key = key_data + self.assertEqual(db_key.key.fingerprint, key_data.fingerprint) + + def test_loaded(self): + key_data = load_key('rsa_1024.priv.asc') + with transaction() as t: + db_key = DBKey(key_data) + t.add(db_key) + + config.db.scoped_session.remove() + + loaded_key = DBKey.query().one() + self.assertEqual(loaded_key.key.fingerprint, key_data.fingerprint) diff --git a/src/mailman_pgp/model/tests/test_fs_key.py b/src/mailman_pgp/model/tests/test_fs_key.py new file mode 100644 index 0000000..8828e4b --- /dev/null +++ b/src/mailman_pgp/model/tests/test_fs_key.py @@ -0,0 +1,99 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see <http://www.gnu.org/licenses/>. + +"""""" +from os.path import join, exists +from tempfile import TemporaryDirectory +from unittest import TestCase + +from mailman_pgp.model.fs_key import FSKey +from mailman_pgp.testing.layers import PGPLayer +from mailman_pgp.testing.pgp import load_key + + +class TestFSKey(TestCase): + layer = PGPLayer + + def setUp(self): + self.tmpdir = TemporaryDirectory() + + def tearDown(self): + self.tmpdir.cleanup() + + def test_load(self): + key_name = 'something.asc' + key_path = join(self.tmpdir.name, key_name) + key_data = load_key('rsa_1024.priv.asc') + with open(key_path, 'w') as key_file: + key_file.write(str(key_data)) + + key = FSKey(self.tmpdir.name, key_name, False) + self.assertEqual(key.key_path, key_path) + + key.load() + self.assertEqual(key.key.fingerprint, key_data.fingerprint) + + def test_reload_none(self): + key_name = 'something.asc' + key = FSKey(self.tmpdir.name, key_name, False) + key_data = load_key('rsa_1024.priv.asc') + with open(key.key_path, 'w') as key_file: + key_file.write(str(key_data)) + + self.assertIsNone(key.key) + key.reload() + self.assertIsNotNone(key.key) + self.assertEqual(key.key.fingerprint, key_data.fingerprint) + + def test_reload_not_none(self): + key_name = 'something.asc' + key_path = join(self.tmpdir.name, key_name) + key_data = load_key('rsa_1024.priv.asc') + new_key_data = load_key('ecc_p256.priv.asc') + with open(key_path, 'w') as key_file: + key_file.write(str(key_data)) + + key = FSKey(self.tmpdir.name, key_name, True) + + with open(key_path, 'w') as key_file: + key_file.write(str(new_key_data)) + + key.reload() + self.assertIsNotNone(key.key) + self.assertEqual(key.key.fingerprint, new_key_data.fingerprint) + + def test_save(self): + key_name = 'something.asc' + key = FSKey(self.tmpdir.name, key_name) + key_data = load_key('rsa_1024.priv.asc') + + key.key = key_data + key.save() + self.assertTrue(exists(key.key_path)) + + def test_delete(self): + key_name = 'something.asc' + key_path = join(self.tmpdir.name, key_name) + key_data = load_key('rsa_1024.priv.asc') + with open(key_path, 'w') as key_file: + key_file.write(str(key_data)) + + key = FSKey(self.tmpdir.name, key_name, True) + + key.delete() + self.assertFalse(exists(key.key_path)) + self.assertIsNotNone(key.key) |
