# Copyright (C) 2014-2017 by the Free Software Foundation, Inc.
#
# This file is part of GNU Mailman.
#
# GNU Mailman is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# GNU Mailman. If not, see .
"""Test the message store."""
import os
import unittest
from mailman.config import config
from mailman.interfaces.messages import IMessageStore
from mailman.model.message import Message
from mailman.testing.helpers import (
specialized_message_from_string as mfs)
from mailman.testing.layers import ConfigLayer
from mailman.utilities.email import add_message_hash
from zope.component import getUtility
class TestMessageStore(unittest.TestCase):
layer = ConfigLayer
def setUp(self):
self._store = getUtility(IMessageStore)
def test_message_id_required(self):
# The Message-ID header is required in order to add it to the store.
message = mfs("""\
Subject: An important message
This message is very important.
""")
self.assertRaises(ValueError, self._store.add, message)
def test_get_message_by_hash(self):
# Messages have an X-Message-ID-Hash header, the value of which can be
# used to look the message up in the message store.
msg = mfs("""\
Subject: An important message
Message-ID:
This message is very important.
""")
add_message_hash(msg)
self._store.add(msg)
self.assertEqual(msg['x-message-id-hash'],
'MS6QLWERIJLGCRF44J7USBFDELMNT2BW')
found = self._store.get_message_by_hash(
'MS6QLWERIJLGCRF44J7USBFDELMNT2BW')
self.assertEqual(found['message-id'], '')
self.assertEqual(found['x-message-id-hash'],
'MS6QLWERIJLGCRF44J7USBFDELMNT2BW')
def test_can_delete_missing_message(self):
# Deleting a message which isn't in the store does not raise an
# exception.
msg = mfs("""\
Message-ID:
""")
self._store.add(msg)
self.assertEqual(len(list(self._store.messages)), 1)
self._store.delete_message('missing')
self.assertEqual(len(list(self._store.messages)), 1)
def test_can_survive_missing_message_path(self):
# Deleting a message which is in the store, but which doesn't appear
# on the file system does not raise an exception, but still removes
# the message from the store.
msg = mfs("""\
Message-ID:
""")
self._store.add(msg)
self.assertEqual(len(list(self._store.messages)), 1)
# We have to use the SQLAlchemy API because the .get_message_by_*()
# methods return email Message objects, not IMessages. The former
# does not give us the path to the object on the file system.
row = config.db.store.query(Message).filter_by(
message_id='').first()
os.remove(os.path.join(config.MESSAGES_DIR, row.path))
self._store.delete_message('')
self.assertEqual(len(list(self._store.messages)), 0)
def test_message_id_hash(self):
# The new specification calls for a Message-ID-Hash header,
# specifically without the X- prefix.
msg = mfs("""\
Message-ID:
""")
self._store.add(msg)
stored_msg = self._store.get_message_by_id('')
self.assertEqual(stored_msg['message-id-hash'],
'MS6QLWERIJLGCRF44J7USBFDELMNT2BW')
# For backward compatibility with the old spec.
self.assertEqual(stored_msg['x-message-id-hash'],
'MS6QLWERIJLGCRF44J7USBFDELMNT2BW')
def test_message_id_hash_gets_replaced(self):
# Any existing Message-ID-Hash header (or for backward compatibility
# X-Message-ID-Hash) gets replaced with its new value.
msg = mfs("""\
Subject: Testing
Message-ID:
Message-ID-Hash: abc
X-Message-ID-Hash: abc
""")
self._store.add(msg)
stored_msg = self._store.get_message_by_id('')
message_id_hashes = stored_msg.get_all('message-id-hash')
self.assertEqual(len(message_id_hashes), 1)
self.assertEqual(message_id_hashes[0],
'MS6QLWERIJLGCRF44J7USBFDELMNT2BW')
# For backward compatibility with the old spec.
x_message_id_hashes = stored_msg.get_all('x-message-id-hash')
self.assertEqual(len(x_message_id_hashes), 1)
self.assertEqual(x_message_id_hashes[0],
'MS6QLWERIJLGCRF44J7USBFDELMNT2BW')
def test_add_message_duplicate_okay(self):
msg = mfs("""\
Subject: Once
Message-ID:
""")
hash32 = self._store.add(msg)
stored_msg = self._store.get_message_by_id('')
self.assertEqual(msg['subject'], stored_msg['subject'])
self.assertEqual(msg['message-id-hash'], hash32)
# A second insertion, even if headers change, does not store the
# message twice.
del msg['subject']
msg['Subject'] = 'Twice'
hash32 = self._store.add(msg)
stored_msg = self._store.get_message_by_id('')
self.assertNotEqual(msg['subject'], stored_msg['subject'])
self.assertIsNone(hash32)