1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# Copyright (C) 2007-2014 by the Free Software Foundation, Inc.
#
# This file is part of GNU Mailman.
#
# GNU Mailman is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
"""Model for message stores."""
from __future__ import absolute_import, print_function, unicode_literals
__metaclass__ = type
__all__ = [
'MessageStore',
]
import os
import errno
import base64
import hashlib
import cPickle as pickle
from zope.interface import implementer
from mailman.config import config
from mailman.database.transaction import dbconnection
from mailman.interfaces.messages import IMessageStore
from mailman.model.message import Message
from mailman.utilities.filesystem import makedirs
# It could be very bad if you have already stored files and you change this
# value. We'd need a script to reshuffle and resplit.
MAX_SPLITS = 2
EMPTYSTRING = ''
@implementer(IMessageStore)
class MessageStore:
"""See `IMessageStore`."""
@dbconnection
def add(self, store, message):
# Ensure that the message has the requisite headers.
message_ids = message.get_all('message-id', [])
if len(message_ids) != 1:
raise ValueError('Exactly one Message-ID header required')
# Calculate and insert the X-Message-ID-Hash.
message_id = message_ids[0]
# Complain if the Message-ID already exists in the storage.
existing = store.query(Message).filter(
Message.message_id == message_id).first()
if existing is not None:
raise ValueError(
'Message ID already exists in message store: {0}'.format(
message_id))
shaobj = hashlib.sha1(message_id)
hash32 = base64.b32encode(shaobj.digest()).decode('ascii')
del message['X-Message-ID-Hash']
message['X-Message-ID-Hash'] = hash32
# Calculate the path on disk where we're going to store this message
# object, in pickled format.
parts = []
split = list(hash32)
while split and len(parts) < MAX_SPLITS:
parts.append(split.pop(0) + split.pop(0))
parts.append(hash32)
relpath = os.path.join(*parts)
# Store the message in the database. This relies on the database
# providing a unique serial number, but to get this information, we
# have to use a straight insert instead of relying on Elixir to create
# the object.
Message(message_id=message_id,
message_id_hash=hash32,
path=relpath)
# Now calculate the full file system path.
path = os.path.join(config.MESSAGES_DIR, relpath)
# Write the file to the path, but catch the appropriate exception in
# case the parent directories don't yet exist. In that case, create
# them and try again.
while True:
try:
with open(path, 'w') as fp:
# -1 says to use the highest protocol available.
pickle.dump(message, fp, -1)
break
except IOError as error:
if error.errno != errno.ENOENT:
raise
makedirs(os.path.dirname(path))
return hash32
def _get_message(self, row):
path = os.path.join(config.MESSAGES_DIR, row.path)
with open(path) as fp:
return pickle.load(fp)
@dbconnection
def get_message_by_id(self, store, message_id):
row = store.query(Message).filter_by(message_id=message_id).first()
if row is None:
return None
return self._get_message(row)
@dbconnection
def get_message_by_hash(self, store, message_id_hash):
if isinstance(message_id_hash, bytes):
message_id_hash = message_id_hash.decode('utf-8')
row = store.query(Message).filter_by(
message_id_hash=message_id_hash).first()
if row is None:
return None
return self._get_message(row)
@property
@dbconnection
def messages(self, store):
for row in store.query(Message).all():
yield self._get_message(row)
@dbconnection
def delete_message(self, store, message_id):
row = store.query(Message).filter_by(message_id=message_id).first()
if row is None:
raise LookupError(message_id)
path = os.path.join(config.MESSAGES_DIR, row.path)
os.remove(path)
store.delete(row)
|