1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
# Copyright (C) 2007-2008 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
from __future__ import with_statement
__metaclass__ = type
__all__ = [
'MessageStore',
]
import os
import errno
import base64
import hashlib
import cPickle as pickle
from zope.interface import implements
from mailman import Utils
from mailman.configuration import config
from mailman.database.message import Message
from mailman.interfaces import IMessageStore
# It could be very bad if you have already stored files and you change this
# value. We'd need a script to reshuffle and resplit.
MAX_SPLITS = 2
EMPTYSTRING = ''
class MessageStore:
implements(IMessageStore)
def add(self, message):
# Ensure that the message has the requisite headers.
message_ids = message.get_all('message-id', [])
if len(message_ids) <> 1:
raise ValueError('Exactly one Message-ID header required')
# Calculate and insert the X-Message-ID-Hash.
message_id = message_ids[0]
# Complain if the Message-ID already exists in the storage.
existing = config.db.store.find(Message,
Message.message_id == message_id).one()
if existing is not None:
raise ValueError('Message ID already exists in message store: %s',
message_id)
shaobj = hashlib.sha1(message_id)
hash32 = base64.b32encode(shaobj.digest())
del message['X-Message-ID-Hash']
message['X-Message-ID-Hash'] = hash32
# Calculate the path on disk where we're going to store this message
# object, in pickled format.
parts = []
split = list(hash32)
while split and len(parts) < MAX_SPLITS:
parts.append(split.pop(0) + split.pop(0))
parts.append(hash32)
relpath = os.path.join(*parts)
# Store the message in the database. This relies on the database
# providing a unique serial number, but to get this information, we
# have to use a straight insert instead of relying on Elixir to create
# the object.
row = Message(message_id=message_id,
message_id_hash=hash32,
path=relpath)
# Now calculate the full file system path.
path = os.path.join(config.MESSAGES_DIR, relpath)
# Write the file to the path, but catch the appropriate exception in
# case the parent directories don't yet exist. In that case, create
# them and try again.
while True:
try:
with open(path, 'w') as fp:
# -1 says to use the highest protocol available.
pickle.dump(message, fp, -1)
break
except IOError, e:
if e.errno <> errno.ENOENT:
raise
os.makedirs(os.path.dirname(path))
return hash32
def _get_message(self, row):
path = os.path.join(config.MESSAGES_DIR, row.path)
with open(path) as fp:
return pickle.load(fp)
def get_message_by_id(self, message_id):
row = config.db.store.find(Message, message_id=message_id).one()
if row is None:
return None
return self._get_message(row)
def get_message_by_hash(self, message_id_hash):
# It's possible the hash came from a message header, in which case it
# will be a Unicode. However when coming from source code, it may be
# an 8-string. Coerce to the latter if necessary; it must be
# US-ASCII.
if isinstance(message_id_hash, unicode):
message_id_hash = message_id_hash.encode('ascii')
row = config.db.store.find(Message,
message_id_hash=message_id_hash).one()
if row is None:
return None
return self._get_message(row)
@property
def messages(self):
for row in config.db.store.find(Message):
yield self._get_message(row)
def delete_message(self, message_id):
row = config.db.store.find(Message, message_id=message_id).one()
if row is None:
raise LookupError(message_id)
path = os.path.join(config.MESSAGES_DIR, row.path)
os.remove(path)
config.db.store.remove(row)
|