1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
# Copyright (C) 2007 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
# USA.
from __future__ import with_statement
__metaclass__ = type
__all__ = [
'MessageStore',
]
import os
import errno
import base64
import hashlib
import cPickle as pickle
from zope.interface import implements
from Mailman import Utils
from Mailman.configuration import config
from Mailman.database.model import Message
from Mailman.interfaces import IMessageStore
# It could be very bad if you have already stored files and you change this
# value. We'd need a script to reshuffle and resplit.
MAX_SPLITS = 2
EMPTYSTRING = ''
class MessageStore:
implements(IMessageStore)
def add(self, message):
# Ensure that the message has the requisite headers.
message_ids = message.get_all('message-id', [])
if len(message_ids) <> 1:
raise ValueError('Exactly one Message-ID header required')
# Calculate and insert the X-List-ID-Hash.
message_id = message_ids[0]
shaobj = hashlib.sha1(message_id)
hash32 = base64.b32encode(shaobj.digest())
del message['X-List-ID-Hash']
message['X-List-ID-Hash'] = hash32
# Calculate the path on disk where we're going to store this message
# object, in pickled format.
parts = []
split = list(hash32)
while split and len(parts) < MAX_SPLITS:
parts.append(split.pop(0) + split.pop(0))
parts.append(EMPTYSTRING.join(split))
relpath = os.path.join(*parts)
# Store the message in the database. This relies on the database
# providing a unique serial number, but to get this information, we
# have to use a straight insert instead of relying on Elixir to create
# the object.
result = Message.table.insert().execute(
hash=hash32, path=relpath, message_id=message_id)
# Add the additional header.
seqno = result.last_inserted_ids()[0]
del message['X-List-Sequence-Number']
message['X-List-Sequence-Number'] = str(seqno)
# Now calculate the full file system path.
path = os.path.join(config.MESSAGES_DIR, relpath, str(seqno))
# Write the file to the path, but catch the appropriate exception in
# case the parent directories don't yet exist. In that case, create
# them and try again.
while True:
try:
with open(path, 'w') as fp:
# -1 says to use the highest protocol available.
pickle.dump(message, fp, -1)
break
except IOError, e:
if e.errno <> errno.ENOENT:
raise
os.makedirs(os.path.dirname(path))
return seqno
def _msgobj(self, msgrow):
path = os.path.join(config.MESSAGES_DIR, msgrow.path, str(msgrow.id))
with open(path) as fp:
return pickle.load(fp)
def get_messages_by_message_id(self, message_id):
for msgrow in Message.query.filter_by(message_id=message_id):
yield self._msgobj(msgrow)
def get_messages_by_hash(self, hash):
for msgrow in Message.query.filter_by(hash=hash):
yield self._msgobj(msgrow)
def _getmsg(self, global_id):
try:
hash, seqno = global_id.split('/', 1)
seqno = int(seqno)
except ValueError:
return None
messages = Message.query.filter_by(id=seqno)
if messages.count() == 0:
return None
assert messages.count() == 1, 'Multiple id matches'
if messages[0].hash <> hash:
# The client lied about which message they wanted. They gave a
# valid sequence number, but the hash did not match.
return None
return messages[0]
def get_message(self, global_id):
msgrow = self._getmsg(global_id)
return (self._msgobj(msgrow) if msgrow is not None else None)
@property
def messages(self):
for msgrow in Message.query.filter_by().all():
yield self._msgobj(msgrow)
def delete_message(self, global_id):
msgrow = self._getmsg(global_id)
if msgrow is None:
raise KeyError(global_id)
msgrow.delete()
|