# Copyright (C) 2012-2017 by the Free Software Foundation, Inc. # # This file is part of GNU Mailman. # # GNU Mailman is free software: you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) # any later version. # # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # more details. # # You should have received a copy of the GNU General Public License along with # GNU Mailman. If not, see . """Test the prototype archiver.""" import os import shutil import tempfile import unittest import threading from email import message_from_file from flufl.lock import Lock from mailman.app.lifecycle import create_list from mailman.archiving.prototype import Prototype from mailman.config import config from mailman.database.transaction import transaction from mailman.testing.helpers import ( LogFileMark, specialized_message_from_string as mfs) from mailman.testing.layers import ConfigLayer from mailman.utilities.email import add_message_hash class TestPrototypeArchiver(unittest.TestCase): """Test the prototype archiver.""" layer = ConfigLayer def setUp(self): # Create a fake mailing list and message object. self._msg = mfs("""\ To: test@example.com From: anne@example.com Subject: Testing the test list Message-ID: Message-ID-Hash: MS6QLWERIJLGCRF44J7USBFDELMNT2BW Tests are better than no tests but the water deserves to be swum. """) with transaction(): self._mlist = create_list('test@example.com') # Set up a temporary directory for the prototype archiver so that it's # easier to clean up. self._tempdir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self._tempdir) config.push('prototype', """ [paths.testing] archive_dir: {} """.format(self._tempdir)) self.addCleanup(config.pop, 'prototype') # Capture the structure of a maildir. self._expected_dir_structure = set( (os.path.join(config.ARCHIVE_DIR, path) for path in ( 'prototype', os.path.join('prototype', self._mlist.fqdn_listname), os.path.join('prototype', self._mlist.fqdn_listname, 'cur'), os.path.join('prototype', self._mlist.fqdn_listname, 'new'), os.path.join('prototype', self._mlist.fqdn_listname, 'tmp'), ))) self._expected_dir_structure.add(config.ARCHIVE_DIR) def _find(self, path): all_filenames = set() for dirpath, dirnames, filenames in os.walk(path): if isinstance(dirpath, bytes): dirpath = dirpath.decode('utf-8') all_filenames.add(dirpath) for filename in filenames: new_filename = filename if isinstance(filename, bytes): new_filename = filename.decode('utf-8') all_filenames.add(os.path.join(dirpath, new_filename)) return all_filenames def test_archive_maildir_created(self): # Archiving a message to the prototype archiver should create the # expected directory structure. Prototype.archive_message(self._mlist, self._msg) all_filenames = self._find(config.ARCHIVE_DIR) # Check that the directory structure has been created and we have one # more file (the archived message) than expected directories. archived_messages = all_filenames - self._expected_dir_structure self.assertEqual(len(archived_messages), 1) self.assertTrue( archived_messages.pop().startswith( os.path.join(config.ARCHIVE_DIR, 'prototype', self._mlist.fqdn_listname, 'new'))) def test_archive_maildir_existence_does_not_raise(self): # Archiving a second message does not cause an EEXIST to be raised # when a second message is archived. new_dir = None Prototype.archive_message(self._mlist, self._msg) for directory in ('cur', 'new', 'tmp'): path = os.path.join(config.ARCHIVE_DIR, 'prototype', self._mlist.fqdn_listname, directory) if directory == 'new': new_dir = path self.assertTrue(os.path.isdir(path)) # There should be one message in the 'new' directory. self.assertEqual(len(os.listdir(new_dir)), 1) # Archive a second message. If an exception occurs, let it fail the # test. Afterward, two messages should be in the 'new' directory. del self._msg['message-id'] del self._msg['message-id-hash'] self._msg['Message-ID'] = '' add_message_hash(self._msg) Prototype.archive_message(self._mlist, self._msg) self.assertEqual(len(os.listdir(new_dir)), 2) def test_archive_lock_used(self): # Test that locking the maildir when adding works as a failure here # could mean we lose mail. lock_file = os.path.join( config.LOCK_DIR, '{0}-maildir.lock'.format( self._mlist.fqdn_listname)) with Lock(lock_file): # Acquire the archiver lock, then make sure the archiver logs the # fact that it could not acquire the lock. archive_thread = threading.Thread( target=Prototype.archive_message, args=(self._mlist, self._msg)) mark = LogFileMark('mailman.error') archive_thread.run() # Test that the archiver output the correct error. line = mark.readline() # XXX 2012-03-15 BAW: we really should remove timestamp prefixes # from the loggers when under test. self.assertTrue(line.endswith( 'Unable to acquire prototype archiver lock for {0}, ' 'discarding: {1}\n'.format( self._mlist.fqdn_listname, self._msg.get('message-id')))) # Check that the message didn't get archived. created_files = self._find(config.ARCHIVE_DIR) self.assertEqual(self._expected_dir_structure, created_files) def test_prototype_archiver_good_path(self): # Verify the good path; the message gets archived. Prototype.archive_message(self._mlist, self._msg) new_path = os.path.join( config.ARCHIVE_DIR, 'prototype', self._mlist.fqdn_listname, 'new') archived_messages = list(os.listdir(new_path)) self.assertEqual(len(archived_messages), 1) # Check that the email has been added. with open(os.path.join(new_path, archived_messages[0])) as fp: archived_message = message_from_file(fp) self.assertEqual(self._msg.as_string(), archived_message.as_string())