From 4f815c200f62cf4e4bf660649066d9bd65393ae0 Mon Sep 17 00:00:00 2001 From: J08nY Date: Mon, 31 Jul 2017 20:43:37 +0200 Subject: Refactor filesystem key store operations into FSKey. Introduce DBKey. --- src/mailman_pgp/model/address.py | 44 +++++---------- src/mailman_pgp/model/db_key.py | 53 ++++++++++++++++++ src/mailman_pgp/model/fs_key.py | 95 ++++++++++++++++++++++++++++++++ src/mailman_pgp/model/list.py | 68 ++++++++++------------- src/mailman_pgp/pgp/keygen.py | 36 +++++++----- src/mailman_pgp/pgp/tests/test_keygen.py | 93 +++++++++++++++++-------------- src/mailman_pgp/pgp/tests/test_pgp.py | 7 ++- src/mailman_pgp/styles/base.py | 4 +- src/mailman_pgp/utils/file.py | 62 +++++++++++++++++++++ 9 files changed, 337 insertions(+), 125 deletions(-) create mode 100644 src/mailman_pgp/model/db_key.py create mode 100644 src/mailman_pgp/model/fs_key.py create mode 100644 src/mailman_pgp/utils/file.py diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py index 0cfefe2..a912e87 100644 --- a/src/mailman_pgp/model/address.py +++ b/src/mailman_pgp/model/address.py @@ -16,18 +16,16 @@ # this program. If not, see . """Model for PGP enabled addresses.""" -import os -from os.path import exists, isfile, join from mailman.database.types import SAUnicode from mailman.interfaces.usermanager import IUserManager -from pgpy import PGPKey from sqlalchemy import Boolean, Column, Integer, String from sqlalchemy.orm import reconstructor from zope.component import getUtility from mailman_pgp.config import config from mailman_pgp.model.base import Base +from mailman_pgp.model.fs_key import FSKey class PGPAddress(Base): @@ -41,15 +39,15 @@ class PGPAddress(Base): key_confirmed = Column(Boolean, default=False) def __init__(self, address): - super().__init__() + super().__init__(email=address.email) self._init() - self.email = address.email self._address = address @reconstructor def _init(self): self._address = None - self._key = None + self._key = FSKey(config.pgp.keydir_config['user_keydir'], + self.email + '.asc', True) @property def key(self): @@ -58,33 +56,22 @@ class PGPAddress(Base): :return: :rtype: pgpy.PGPKey """ - if self.key_fingerprint is None: - return None - if self._key is None: - if exists(self.key_path) and isfile(self.key_path): - self._key, _ = PGPKey.from_file(self.key_path) - return self._key + self._key.reload() + return self._key.key @key.setter - def key(self, new_key): + def key(self, value): """ - :param new_key: - :type new_key: PGPKey + :param value: + :type value: pgpy.PGPKey """ - if self.key_fingerprint is not None: - try: - os.remove(self.key_path) - except FileNotFoundError: - pass - if new_key is None: + if value is None: self.key_fingerprint = None - self._key = None else: - self.key_fingerprint = str(new_key.fingerprint) - with open(self.key_path, 'w') as out: - out.write(str(new_key)) - self._key = new_key + self.key_fingerprint = value.fingerprint + self._key.key = value + self._key.save() @property def key_path(self): @@ -93,10 +80,7 @@ class PGPAddress(Base): :return: :rtype: str """ - if self.key_fingerprint is None: - return None - return join(config.pgp.keydir_config['user_keydir'], - self.key_fingerprint + '.asc') + return self._key.key_path @property def address(self): diff --git a/src/mailman_pgp/model/db_key.py b/src/mailman_pgp/model/db_key.py new file mode 100644 index 0000000..45eafc7 --- /dev/null +++ b/src/mailman_pgp/model/db_key.py @@ -0,0 +1,53 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . + +"""Database stored PGP key.""" +from sqlalchemy import Column, Integer, LargeBinary +from sqlalchemy.orm import reconstructor + +from mailman_pgp.model.base import Base +from mailman_pgp.utils.pgp import key_from_blob + + +class DBKey(Base): + """Database stored PGP key.""" + __tablename__ = 'keys' + + id = Column(Integer, primary_key=True) + key_material = Column(LargeBinary) + + def __init__(self): + super().__init__() + + @reconstructor + def _init(self): + self._key = None + + @property + def key(self): + if self._key is None: + self._key = key_from_blob(self.key_material) + return self._key + + @key.setter + def key(self, value): + if value is None: + self._key = None + self.key_material = bytes() + else: + self._key = value + self.key_material = bytes(value) diff --git a/src/mailman_pgp/model/fs_key.py b/src/mailman_pgp/model/fs_key.py new file mode 100644 index 0000000..c7c86eb --- /dev/null +++ b/src/mailman_pgp/model/fs_key.py @@ -0,0 +1,95 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . + +"""Filesystem stored PGP key.""" +from os import remove +from os.path import getmtime, join + +from mailman_pgp.utils.file import locked_obj +from mailman_pgp.utils.pgp import key_from_file + + +class FSKey: + """Filesystem stored PGP key.""" + + def __init__(self, keydir, keyfile, load=False): + self._key = None + self._mtime = None + self.keydir = keydir + self.keyfile = keyfile + if load: + self.load() + + @property + def key(self): + """ + + :rtype: pgpy.PGPKey + """ + return self._key + + @key.setter + def key(self, value): + """ + + :param value: + :type value: pgpy.PGPKey + """ + self._key = value + + @property + def key_path(self): + return join(self.keydir, self.keyfile) + + @property + def lock_path(self): + return self.key_path + '.lock' + + def _load(self): + try: + self.key = key_from_file(self.key_path) + self._mtime = getmtime(self.key_path) + except FileNotFoundError: + self.key = None + self._mtime = None + + @locked_obj('lock_path') + def load(self): + self._load() + + @locked_obj('lock_path') + def reload(self): + if self.key is None: + self._load() + else: + mtime = getmtime(self.key_path) + if self._mtime is None or mtime > self._mtime: + self._load() + + @locked_obj('lock_path') + def save(self): + if self.key is None: + remove(self.key_path) + self._mtime = None + else: + with open(self.key_path, 'w') as key_file: + key_file.write(str(self.key)) + self._mtime = getmtime(self.key_path) + + @locked_obj('lock_path') + def delete(self): + remove(self.key_path) diff --git a/src/mailman_pgp/model/list.py b/src/mailman_pgp/model/list.py index 8448368..838bcab 100644 --- a/src/mailman_pgp/model/list.py +++ b/src/mailman_pgp/model/list.py @@ -17,14 +17,9 @@ """Model for PGP enabled mailing lists.""" -from os import remove -from os.path import exists, isfile, join - -from flufl.lock import Lock from mailman.database.types import Enum, SAUnicode from mailman.interfaces.action import Action from mailman.interfaces.listmanager import (IListManager, ListDeletingEvent) -from pgpy import PGPKey from public import public from sqlalchemy import Boolean, Column, Integer from sqlalchemy.orm import reconstructor @@ -34,7 +29,7 @@ from zope.event import classhandler from mailman_pgp.config import config from mailman_pgp.database import transaction from mailman_pgp.model.base import Base -from mailman_pgp.pgp.keygen import ListKeyGenerator +from mailman_pgp.model.fs_key import FSKey @public @@ -44,7 +39,7 @@ class PGPMailingList(Base): __tablename__ = 'pgp_lists' id = Column(Integer, primary_key=True) - list_id = Column(SAUnicode, index=True) + list_id = Column(SAUnicode, index=True, unique=True) # Signature related properties unsigned_msg_action = Column(Enum(Action), default=Action.reject) @@ -61,16 +56,20 @@ class PGPMailingList(Base): encrypt_outgoing = Column(Boolean, default=True) def __init__(self, mlist): - super().__init__() + """ + + :param mlist: + :type mlist: mailman.model.mailinglist.MailingList + """ + super().__init__(list_id=mlist.list_id) self._init() - self.list_id = mlist.list_id self._mlist = mlist @reconstructor def _init(self): self._mlist = None - self._key = None - self._key_generator = None + self._key = FSKey(config.pgp.keydir_config['list_keydir'], + self.list_id + '.asc', True) @property def mlist(self): @@ -83,45 +82,35 @@ class PGPMailingList(Base): self._mlist = getUtility(IListManager).get_by_list_id(self.list_id) return self._mlist + @property + def fs_key(self): + return self._key + @property def key(self): """ + The private part of the list's keypair. :return: :rtype: pgpy.PGPKey """ - if self._key is None: - # Check the file - if exists(self.key_path) and isfile(self.key_path): - self._key, _ = PGPKey.from_file(self.key_path) - return self._key + self._key.reload() + return self._key.key @key.setter def key(self, value): - with Lock(self.key_path + '.lock'): - self._key = value - if value is None: - remove(self.key_path) - else: - with open(self.key_path, 'w') as key_file: - key_file.write(str(value)) - - def generate_key(self, block=False): - self._key = None - self._key_generator = ListKeyGenerator(config.pgp.primary_key_args, - config.pgp.sub_key_args, - self.mlist.display_name, - self.mlist.posting_address, - self.mlist.request_address, - self.key_path) - self._key_generator.start() - if block: - self._key_generator.join() - return self.key + """ + + :param value: + :type value: + """ + self._key.key = value + self._key.save() @property def pubkey(self): """ + The public part of the list's keypair. :return: :rtype: pgpy.PGPKey @@ -133,18 +122,19 @@ class PGPMailingList(Base): @property def key_path(self): """ + The path to this list's key in the `list_keydir`. - :return: + :return: List key path. :rtype: str """ - return join(config.pgp.keydir_config['list_keydir'], - self.list_id + '.asc') + return self._key.key_path @staticmethod def for_list(mlist): """ :param mlist: + :type mlist: mailman.model.mailinglist.MailingList :return: :rtype: PGPMailingList|None """ diff --git a/src/mailman_pgp/pgp/keygen.py b/src/mailman_pgp/pgp/keygen.py index 684b81a..57a94d7 100644 --- a/src/mailman_pgp/pgp/keygen.py +++ b/src/mailman_pgp/pgp/keygen.py @@ -25,22 +25,32 @@ from pgpy import PGPKey, PGPUID from pgpy.constants import ( CompressionAlgorithm, HashAlgorithm, KeyFlags, SymmetricKeyAlgorithm) +from mailman_pgp.config import config +from mailman_pgp.utils.pgp import key_from_file + class ListKeyGenerator(mp.Process): """A multiprocessing list key generator.""" - def __init__(self, primary_args, subkey_args, display_name, - posting_address, - request_address, key_path): + def __init__(self, pgp_list): super().__init__( - target=self.generate, - args=(primary_args, subkey_args, display_name, posting_address, - request_address, key_path), + target=self._run, + args=(config.pgp.primary_key_args, config.pgp.sub_key_args, + pgp_list.mlist.display_name, + pgp_list.mlist.posting_address, + pgp_list.mlist.request_address, + pgp_list.key_path), daemon=True) + self._pgp_list = pgp_list + + def generate(self, block=False): + self.start() + if block: + self.join() + return key_from_file(self._pgp_list.key_path) - def generate(self, primary_args, subkey_args, display_name, - posting_address, - request_address, key_path): + def _run(self, primary_args, subkey_args, display_name, posting_address, + request_address, key_path): """ Generate the list keypair and save it. @@ -51,11 +61,11 @@ class ListKeyGenerator(mp.Process): :param request_address: :param key_path: """ - key = self._create(primary_args, subkey_args, display_name, - posting_address, - request_address) + self.key = self._create(primary_args, subkey_args, display_name, + posting_address, + request_address) with Lock(key_path + '.lock'): - self._save(key, key_path) + self._save(self.key, key_path) def _create(self, primary_args, subkey_args, display_name, posting_address, request_address): diff --git a/src/mailman_pgp/pgp/tests/test_keygen.py b/src/mailman_pgp/pgp/tests/test_keygen.py index a3a5499..7de1a19 100644 --- a/src/mailman_pgp/pgp/tests/test_keygen.py +++ b/src/mailman_pgp/pgp/tests/test_keygen.py @@ -16,25 +16,29 @@ # this program. If not, see . """Test the out-of-process key generator.""" -from os.path import exists, isfile, join -from tempfile import TemporaryDirectory +from os.path import exists, isfile from unittest import TestCase +from mailman.app.lifecycle import create_list from parameterized import parameterized -from pgpy import PGPKey from pgpy.constants import EllipticCurveOID, PubKeyAlgorithm +from mailman_pgp.config import config +from mailman_pgp.database import mm_transaction +from mailman_pgp.model.list import PGPMailingList from mailman_pgp.pgp.keygen import ListKeyGenerator -from mailman_pgp.testing.layers import PGPLayer +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.utils.pgp import key_from_file class TestKeygen(TestCase): - layer = PGPLayer + layer = PGPConfigLayer def setUp(self): - self.display_name = 'Display Name' - self.posting_address = 'posting@address.com' - self.request_address = 'posting-request@address.com' + with mm_transaction(): + self.mlist = create_list('test@example.com', + style_name='pgp-default') + self.pgp_list = PGPMailingList.for_list(self.mlist) @parameterized.expand([ # RSA + RSA @@ -49,36 +53,43 @@ class TestKeygen(TestCase): ]) def test_generate(self, primary_key_type, primary_key_size, sub_key_type, sub_key_size): - with TemporaryDirectory() as temp_dir: - key_path = join(temp_dir, 'key.asc') - keygen = ListKeyGenerator((primary_key_type, primary_key_size), - (sub_key_type, sub_key_size), - self.display_name, - self.posting_address, - self.request_address, key_path) - keygen.start() - keygen.join() - self.assertTrue(exists(key_path)) - self.assertTrue(isfile(key_path)) - - key, _ = PGPKey.from_file(key_path) - self.assertEqual(key.key_algorithm, - primary_key_type) - self.assertEqual(key.key_size, - primary_key_size) - - subs = key.subkeys - self.assertEqual(len(subs), 1) - - keyid, sub = subs.popitem() - self.assertEqual(sub.key_algorithm, - sub_key_type) - self.assertEqual(sub.key_size, - sub_key_size) - - uids = key.userids - self.assertEqual(len(uids), 2) - for uid in uids: - self.assertEqual(uid.name, self.display_name) - self.assertIn(uid.email, - (self.posting_address, self.request_address)) + def reset_primary(primary_key_args): + config.pgp.primary_key_args = primary_key_args + + self.addCleanup(reset_primary, config.pgp.primary_key_args) + + def reset_sub(sub_key_args): + config.pgp.sub_key_args = sub_key_args + + self.addCleanup(reset_sub, config.pgp.sub_key_args) + + config.pgp.primary_key_args = (primary_key_type, primary_key_size) + config.pgp.sub_key_args = (sub_key_type, sub_key_size) + + key_path = self.pgp_list.key_path + keygen = ListKeyGenerator(self.pgp_list) + ret_key = keygen.generate(True) + list_key = self.pgp_list.key + self.assertTrue(exists(key_path)) + self.assertTrue(isfile(key_path)) + + key = key_from_file(key_path) + self.assertEqual(key.key_algorithm, primary_key_type) + self.assertEqual(key.key_size, primary_key_size) + self.assertEqual(ret_key.fingerprint, key.fingerprint) + self.assertEqual(list_key.fingerprint, key.fingerprint) + + subs = key.subkeys + self.assertEqual(len(subs), 1) + + keyid, sub = subs.popitem() + self.assertEqual(sub.key_algorithm, sub_key_type) + self.assertEqual(sub.key_size, sub_key_size) + + uids = key.userids + self.assertEqual(len(uids), 2) + for uid in uids: + self.assertEqual(uid.name, self.pgp_list.mlist.display_name) + self.assertIn(uid.email, + (self.pgp_list.mlist.posting_address, + self.pgp_list.mlist.request_address)) diff --git a/src/mailman_pgp/pgp/tests/test_pgp.py b/src/mailman_pgp/pgp/tests/test_pgp.py index ab2a69a..6ef531e 100644 --- a/src/mailman_pgp/pgp/tests/test_pgp.py +++ b/src/mailman_pgp/pgp/tests/test_pgp.py @@ -25,6 +25,7 @@ from mailman_pgp.config import config from mailman_pgp.database import mm_transaction, transaction from mailman_pgp.model.address import PGPAddress from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.keygen import ListKeyGenerator from mailman_pgp.testing.layers import PGPConfigLayer from mailman_pgp.testing.pgp import load_key @@ -37,7 +38,7 @@ class TestPGP(TestCase): self.mlist = create_list('test@example.com', style_name='pgp-default') self.pgp_list = PGPMailingList.for_list(self.mlist) - self.list_key = self.pgp_list.generate_key(True) + self.list_key = ListKeyGenerator(self.pgp_list).generate(True) # Make Anne a member of this mailing list. self.anne = subscribe(self.mlist, 'Anne', email='anne@example.org') @@ -61,3 +62,7 @@ class TestPGP(TestCase): with keyring.key(self.anne_key.fingerprint) as key: self.assertTrue(key.is_public) self.assertEqual(key.fingerprint, self.anne_key.fingerprint) + + def test_archive_keydir(self): + keyring = config.pgp.archive_keyring + self.assertEqual(len(keyring), 0) diff --git a/src/mailman_pgp/styles/base.py b/src/mailman_pgp/styles/base.py index a7c3366..bad67ad 100644 --- a/src/mailman_pgp/styles/base.py +++ b/src/mailman_pgp/styles/base.py @@ -22,6 +22,7 @@ from public import public from mailman_pgp.config import config, mm_config from mailman_pgp.database import transaction from mailman_pgp.model.list import PGPMailingList +from mailman_pgp.pgp.keygen import ListKeyGenerator @public @@ -46,5 +47,6 @@ class PGPStyle: with transaction() as session: pgp_list = PGPMailingList(mailing_list) if generate: - pgp_list.generate_key() + keygen = ListKeyGenerator(pgp_list) + keygen.generate(True) session.add(pgp_list) diff --git a/src/mailman_pgp/utils/file.py b/src/mailman_pgp/utils/file.py new file mode 100644 index 0000000..d633c4a --- /dev/null +++ b/src/mailman_pgp/utils/file.py @@ -0,0 +1,62 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . + +"""""" +from functools import wraps + +from flufl.lock import Lock + + +def locked(lockfile, *lock_args, **lock_kwargs): + """ + + :param lockfile: + :param lock_args: + :param lock_kwargs: + :return: + """ + + def locked_decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + with Lock(lockfile, *lock_args, **lock_kwargs): + return func(*args, **kwargs) + + return wrapper + + return locked_decorator + + +def locked_obj(lockattr, *lock_args, **lock_kwargs): + """ + + :param lockattr: + :param lock_args: + :param lock_kwargs: + :return: + """ + + def locked_decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + lockfile = getattr(args[0], lockattr) + locked_func = locked(lockfile, *lock_args, **lock_kwargs)(func) + return locked_func(*args, **kwargs) + + return wrapper + + return locked_decorator -- cgit v1.2.3-70-g09d2 From b16fe8644ddc82584a8ff9a2f98e40c1571437f2 Mon Sep 17 00:00:00 2001 From: J08nY Date: Mon, 31 Jul 2017 22:02:36 +0200 Subject: More tests for models. --- src/mailman_pgp/database/__init__.py | 4 +- src/mailman_pgp/model/address.py | 2 + src/mailman_pgp/model/db_key.py | 14 +++-- src/mailman_pgp/model/fs_key.py | 4 +- src/mailman_pgp/model/sighash.py | 2 + src/mailman_pgp/model/tests/test_db_key.py | 50 +++++++++++++++ src/mailman_pgp/model/tests/test_fs_key.py | 99 ++++++++++++++++++++++++++++++ 7 files changed, 167 insertions(+), 8 deletions(-) create mode 100644 src/mailman_pgp/model/tests/test_db_key.py create mode 100644 src/mailman_pgp/model/tests/test_fs_key.py diff --git a/src/mailman_pgp/database/__init__.py b/src/mailman_pgp/database/__init__.py index 2e24cdc..11ea387 100644 --- a/src/mailman_pgp/database/__init__.py +++ b/src/mailman_pgp/database/__init__.py @@ -38,7 +38,7 @@ class Database: url = config.get('db', 'url') self._url = expand(url, None, mailman_config.paths) self.engine = create_engine(self._url) - self._scoped_session = scoped_session(sessionmaker(bind=self.engine)) + self.scoped_session = scoped_session(sessionmaker(bind=self.engine)) Base.metadata.create_all(self.engine) self.session.commit() @@ -50,7 +50,7 @@ class Database: :return: A scoped session. :rtype: scoped_session """ - return self._scoped_session() + return self.scoped_session() @public diff --git a/src/mailman_pgp/model/address.py b/src/mailman_pgp/model/address.py index a912e87..9ecfa95 100644 --- a/src/mailman_pgp/model/address.py +++ b/src/mailman_pgp/model/address.py @@ -19,6 +19,7 @@ from mailman.database.types import SAUnicode from mailman.interfaces.usermanager import IUserManager +from public import public from sqlalchemy import Boolean, Column, Integer, String from sqlalchemy.orm import reconstructor from zope.component import getUtility @@ -28,6 +29,7 @@ from mailman_pgp.model.base import Base from mailman_pgp.model.fs_key import FSKey +@public class PGPAddress(Base): """A PGP enabled address.""" diff --git a/src/mailman_pgp/model/db_key.py b/src/mailman_pgp/model/db_key.py index 45eafc7..0b3af45 100644 --- a/src/mailman_pgp/model/db_key.py +++ b/src/mailman_pgp/model/db_key.py @@ -16,6 +16,7 @@ # this program. If not, see . """Database stored PGP key.""" +from public import public from sqlalchemy import Column, Integer, LargeBinary from sqlalchemy.orm import reconstructor @@ -23,15 +24,18 @@ from mailman_pgp.model.base import Base from mailman_pgp.utils.pgp import key_from_blob +@public class DBKey(Base): """Database stored PGP key.""" __tablename__ = 'keys' id = Column(Integer, primary_key=True) - key_material = Column(LargeBinary) + _key_material = Column('key_material', LargeBinary) - def __init__(self): + def __init__(self, key=None): super().__init__() + self._init() + self.key = key @reconstructor def _init(self): @@ -40,14 +44,14 @@ class DBKey(Base): @property def key(self): if self._key is None: - self._key = key_from_blob(self.key_material) + self._key = key_from_blob(self._key_material) return self._key @key.setter def key(self, value): if value is None: self._key = None - self.key_material = bytes() + self._key_material = bytes() else: self._key = value - self.key_material = bytes(value) + self._key_material = bytes(value) diff --git a/src/mailman_pgp/model/fs_key.py b/src/mailman_pgp/model/fs_key.py index c7c86eb..c4a2ac4 100644 --- a/src/mailman_pgp/model/fs_key.py +++ b/src/mailman_pgp/model/fs_key.py @@ -19,10 +19,12 @@ from os import remove from os.path import getmtime, join +from public import public + from mailman_pgp.utils.file import locked_obj from mailman_pgp.utils.pgp import key_from_file - +@public class FSKey: """Filesystem stored PGP key.""" diff --git a/src/mailman_pgp/model/sighash.py b/src/mailman_pgp/model/sighash.py index 9803ffc..d936f35 100644 --- a/src/mailman_pgp/model/sighash.py +++ b/src/mailman_pgp/model/sighash.py @@ -19,8 +19,10 @@ from sqlalchemy import Column, LargeBinary, String from mailman_pgp.model.base import Base +from public import public +@public class PGPSigHash(Base): """""" diff --git a/src/mailman_pgp/model/tests/test_db_key.py b/src/mailman_pgp/model/tests/test_db_key.py new file mode 100644 index 0000000..46cc487 --- /dev/null +++ b/src/mailman_pgp/model/tests/test_db_key.py @@ -0,0 +1,50 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . + +"""""" +from unittest import TestCase + +from mailman_pgp.config import config +from mailman_pgp.database import transaction +from mailman_pgp.model.db_key import DBKey +from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.testing.pgp import load_key + + +class TestDBKey(TestCase): + layer = PGPConfigLayer + + def test_key(self): + with transaction() as t: + db_key = DBKey() + t.add(db_key) + + key_data = load_key('rsa_1024.priv.asc') + with transaction(): + db_key.key = key_data + self.assertEqual(db_key.key.fingerprint, key_data.fingerprint) + + def test_loaded(self): + key_data = load_key('rsa_1024.priv.asc') + with transaction() as t: + db_key = DBKey(key_data) + t.add(db_key) + + config.db.scoped_session.remove() + + loaded_key = DBKey.query().one() + self.assertEqual(loaded_key.key.fingerprint, key_data.fingerprint) diff --git a/src/mailman_pgp/model/tests/test_fs_key.py b/src/mailman_pgp/model/tests/test_fs_key.py new file mode 100644 index 0000000..8828e4b --- /dev/null +++ b/src/mailman_pgp/model/tests/test_fs_key.py @@ -0,0 +1,99 @@ +# Copyright (C) 2017 Jan Jancar +# +# This file is a part of the Mailman PGP plugin. +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free +# Software Foundation, either version 3 of the License, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . + +"""""" +from os.path import join, exists +from tempfile import TemporaryDirectory +from unittest import TestCase + +from mailman_pgp.model.fs_key import FSKey +from mailman_pgp.testing.layers import PGPLayer +from mailman_pgp.testing.pgp import load_key + + +class TestFSKey(TestCase): + layer = PGPLayer + + def setUp(self): + self.tmpdir = TemporaryDirectory() + + def tearDown(self): + self.tmpdir.cleanup() + + def test_load(self): + key_name = 'something.asc' + key_path = join(self.tmpdir.name, key_name) + key_data = load_key('rsa_1024.priv.asc') + with open(key_path, 'w') as key_file: + key_file.write(str(key_data)) + + key = FSKey(self.tmpdir.name, key_name, False) + self.assertEqual(key.key_path, key_path) + + key.load() + self.assertEqual(key.key.fingerprint, key_data.fingerprint) + + def test_reload_none(self): + key_name = 'something.asc' + key = FSKey(self.tmpdir.name, key_name, False) + key_data = load_key('rsa_1024.priv.asc') + with open(key.key_path, 'w') as key_file: + key_file.write(str(key_data)) + + self.assertIsNone(key.key) + key.reload() + self.assertIsNotNone(key.key) + self.assertEqual(key.key.fingerprint, key_data.fingerprint) + + def test_reload_not_none(self): + key_name = 'something.asc' + key_path = join(self.tmpdir.name, key_name) + key_data = load_key('rsa_1024.priv.asc') + new_key_data = load_key('ecc_p256.priv.asc') + with open(key_path, 'w') as key_file: + key_file.write(str(key_data)) + + key = FSKey(self.tmpdir.name, key_name, True) + + with open(key_path, 'w') as key_file: + key_file.write(str(new_key_data)) + + key.reload() + self.assertIsNotNone(key.key) + self.assertEqual(key.key.fingerprint, new_key_data.fingerprint) + + def test_save(self): + key_name = 'something.asc' + key = FSKey(self.tmpdir.name, key_name) + key_data = load_key('rsa_1024.priv.asc') + + key.key = key_data + key.save() + self.assertTrue(exists(key.key_path)) + + def test_delete(self): + key_name = 'something.asc' + key_path = join(self.tmpdir.name, key_name) + key_data = load_key('rsa_1024.priv.asc') + with open(key_path, 'w') as key_file: + key_file.write(str(key_data)) + + key = FSKey(self.tmpdir.name, key_name, True) + + key.delete() + self.assertFalse(exists(key.key_path)) + self.assertIsNotNone(key.key) -- cgit v1.2.3-70-g09d2 From 39291794b39b52804b2b45d74d0d5aad0f0eefcf Mon Sep 17 00:00:00 2001 From: J08nY Date: Mon, 31 Jul 2017 22:40:01 +0200 Subject: Add basic shredding to FSKey. Needs more low-level solution. --- src/mailman_pgp/model/fs_key.py | 22 ++++++++++++-- src/mailman_pgp/model/list.py | 8 +++-- src/mailman_pgp/model/sighash.py | 2 +- src/mailman_pgp/model/tests/test_fs_key.py | 49 +++++++++++++++++++++--------- src/mailman_pgp/model/tests/test_list.py | 17 +++++++++++ 5 files changed, 78 insertions(+), 20 deletions(-) diff --git a/src/mailman_pgp/model/fs_key.py b/src/mailman_pgp/model/fs_key.py index c4a2ac4..118f971 100644 --- a/src/mailman_pgp/model/fs_key.py +++ b/src/mailman_pgp/model/fs_key.py @@ -16,14 +16,15 @@ # this program. If not, see . """Filesystem stored PGP key.""" -from os import remove -from os.path import getmtime, join +from os import remove, urandom +from os.path import getmtime, getsize, join from public import public from mailman_pgp.utils.file import locked_obj from mailman_pgp.utils.pgp import key_from_file + @public class FSKey: """Filesystem stored PGP key.""" @@ -94,4 +95,19 @@ class FSKey: @locked_obj('lock_path') def delete(self): - remove(self.key_path) + try: + remove(self.key_path) + except FileNotFoundError: + pass + + @locked_obj('lock_path') + def shred(self): + try: + size = getsize(self.key_path) + for _ in range(10): + with open(self.key_path, 'wb') as f: + data = urandom(size) + f.write(data) + remove(self.key_path) + except FileNotFoundError: + pass diff --git a/src/mailman_pgp/model/list.py b/src/mailman_pgp/model/list.py index 838bcab..946af34 100644 --- a/src/mailman_pgp/model/list.py +++ b/src/mailman_pgp/model/list.py @@ -16,7 +16,7 @@ # this program. If not, see . """Model for PGP enabled mailing lists.""" - +from lazr.config import as_boolean from mailman.database.types import Enum, SAUnicode from mailman.interfaces.action import Action from mailman.interfaces.listmanager import (IListManager, ListDeletingEvent) @@ -145,8 +145,12 @@ class PGPMailingList(Base): @classhandler.handler(ListDeletingEvent) def on_delete(event): + shred = as_boolean(config.get('keypairs', 'shred')) pgp_list = PGPMailingList.for_list(event.mailing_list) if pgp_list: with transaction() as session: - # TODO shred the list key + if shred: + pgp_list.fs_key.shred() + else: + pgp_list.fs_key.delete() session.delete(pgp_list) diff --git a/src/mailman_pgp/model/sighash.py b/src/mailman_pgp/model/sighash.py index d936f35..e8a7ca9 100644 --- a/src/mailman_pgp/model/sighash.py +++ b/src/mailman_pgp/model/sighash.py @@ -16,10 +16,10 @@ # this program. If not, see . """""" +from public import public from sqlalchemy import Column, LargeBinary, String from mailman_pgp.model.base import Base -from public import public @public diff --git a/src/mailman_pgp/model/tests/test_fs_key.py b/src/mailman_pgp/model/tests/test_fs_key.py index 8828e4b..3a4f859 100644 --- a/src/mailman_pgp/model/tests/test_fs_key.py +++ b/src/mailman_pgp/model/tests/test_fs_key.py @@ -16,7 +16,7 @@ # this program. If not, see . """""" -from os.path import join, exists +from os.path import exists, join from tempfile import TemporaryDirectory from unittest import TestCase @@ -47,19 +47,7 @@ class TestFSKey(TestCase): key.load() self.assertEqual(key.key.fingerprint, key_data.fingerprint) - def test_reload_none(self): - key_name = 'something.asc' - key = FSKey(self.tmpdir.name, key_name, False) - key_data = load_key('rsa_1024.priv.asc') - with open(key.key_path, 'w') as key_file: - key_file.write(str(key_data)) - - self.assertIsNone(key.key) - key.reload() - self.assertIsNotNone(key.key) - self.assertEqual(key.key.fingerprint, key_data.fingerprint) - - def test_reload_not_none(self): + def test_reload(self): key_name = 'something.asc' key_path = join(self.tmpdir.name, key_name) key_data = load_key('rsa_1024.priv.asc') @@ -76,6 +64,18 @@ class TestFSKey(TestCase): self.assertIsNotNone(key.key) self.assertEqual(key.key.fingerprint, new_key_data.fingerprint) + def test_reload_none(self): + key_name = 'something.asc' + key = FSKey(self.tmpdir.name, key_name, False) + key_data = load_key('rsa_1024.priv.asc') + with open(key.key_path, 'w') as key_file: + key_file.write(str(key_data)) + + self.assertIsNone(key.key) + key.reload() + self.assertIsNotNone(key.key) + self.assertEqual(key.key.fingerprint, key_data.fingerprint) + def test_save(self): key_name = 'something.asc' key = FSKey(self.tmpdir.name, key_name) @@ -97,3 +97,24 @@ class TestFSKey(TestCase): key.delete() self.assertFalse(exists(key.key_path)) self.assertIsNotNone(key.key) + + def test_delete_none(self): + key = FSKey(self.tmpdir.name, 'something.asc') + key.delete() + + def test_shred(self): + key_name = 'something.asc' + key_path = join(self.tmpdir.name, key_name) + key_data = load_key('rsa_1024.priv.asc') + with open(key_path, 'w') as key_file: + key_file.write(str(key_data)) + + key = FSKey(self.tmpdir.name, key_name, True) + + key.shred() + self.assertFalse(exists(key.key_path)) + self.assertIsNotNone(key.key) + + def test_shred_none(self): + key = FSKey(self.tmpdir.name, 'something.asc') + key.shred() diff --git a/src/mailman_pgp/model/tests/test_list.py b/src/mailman_pgp/model/tests/test_list.py index 58f52f2..aa07a79 100644 --- a/src/mailman_pgp/model/tests/test_list.py +++ b/src/mailman_pgp/model/tests/test_list.py @@ -16,15 +16,18 @@ # this program. If not, see . """""" +from os.path import exists from unittest import TestCase from mailman.app.lifecycle import create_list from mailman.interfaces.listmanager import IListManager from zope.component import getUtility +from mailman_pgp.config import config from mailman_pgp.database import mm_transaction from mailman_pgp.model.list import PGPMailingList from mailman_pgp.testing.layers import PGPConfigLayer +from mailman_pgp.testing.pgp import load_key class TestPGPMailingList(TestCase): @@ -34,8 +37,22 @@ class TestPGPMailingList(TestCase): with mm_transaction(): self.mlist = create_list('test@example.com', style_name='pgp-default') + pgp_list = PGPMailingList.for_list(self.mlist) + pgp_list.key = load_key('rsa_1024.priv.asc') def test_delete(self): getUtility(IListManager).delete(self.mlist) pgp_list = PGPMailingList.for_list(self.mlist) self.assertIsNone(pgp_list) + + def test_shred_key(self): + key_path = PGPMailingList.for_list(self.mlist).key_path + getUtility(IListManager).delete(self.mlist) + self.assertFalse(exists(key_path)) + + def test_delete_key(self): + self.addCleanup(config.set, 'keypairs', 'shred', 'yes') + config.set('keypairs', 'shred', 'no') + key_path = PGPMailingList.for_list(self.mlist).key_path + getUtility(IListManager).delete(self.mlist) + self.assertFalse(exists(key_path)) -- cgit v1.2.3-70-g09d2