1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
#! /usr/bin/env python
#
# Copyright (C) 1998 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""Partition a mass delivery into a suitable set of subdeliveries.
The script takes the following protocol on stdin:
line 1: batchnum
line 2: sender
line 3...n+2: n recipients
line n+3: <empty> - delimiting end of recipients
line n+4: message content
The recipients will be distributed into at most batchnum batches, grouping
together recipients in the same major domain as much as possible."""
# Heh, heh, heh, this partition reminds me of the knapsack problem ;-)
# Ie, the optimal distribution here is NP Complete.
import os, sys
import string, re
import paths
from Mailman import mm_cfg
from Mailman import Utils
from Mailman.Logging.Utils import LogStdErr
LogStdErr("error", "deliver")
# Settings for pauses during process-depletion (error.EAGAIN) condition.
REFRACT = 15 # Seconds between fork retries.
TRIES = 5 # Number of times to retry
def main():
if not forker():
# in the child
try:
do_child()
finally:
os._exit(0)
def do_child():
LogStdErr("error", "deliver (child)")
domain_info = {}
spawns = string.atol(sys.stdin.readline()[:-1])
sender = sys.stdin.readline()[:-1]
to_list = []
while 1:
l = sys.stdin.readline()[:-1]
if not l:
break
to_list.append(l)
text = sys.stdin.read()
if spawns > mm_cfg.MAX_SPAWNS:
spawns = mm_cfg.MAX_SPAWNS
if spawns < 1:
spawns = 1
# Group by domain.
for addr in to_list:
parts = re.split('[.@]', addr)
key = string.join(parts[-2:])
if not domain_info.has_key(key):
domain_info[key] = [addr]
else:
domain_info[key].append(addr)
final_groups = BuildGroups(domain_info.values(), len(to_list), spawns)
ContactTransportForEachGroup(sender, final_groups, text)
def BuildGroups(biglist, num_addrs, spawns):
biglist.sort(lambda x,y: len(x) < len(y))
groups = []
for i in range(spawns-1):
target_size = num_addrs / (spawns - i)
if not len(biglist):
break
newlist = biglist[0]
biglist.remove(biglist[0])
j = 0
while len(newlist) < target_size:
if j >= len(biglist):
break
if len(newlist) + len(biglist[j]) > target_size:
j = j + 1
continue
newlist = newlist + biglist[j]
biglist.remove(biglist[j])
groups.append(newlist)
num_addrs = num_addrs - len(newlist)
lastgroup = []
for item in biglist:
lastgroup = lastgroup + item
if len(lastgroup):
groups.append(lastgroup)
return groups
def ContactTransport(sender, recip, text):
"""Pipe message parties & text to contact_transport for SMTP transmit."""
cmd = os.path.join(mm_cfg.SCRIPTS_DIR, "contact_transport")
proc = os.popen("%s %s" % (mm_cfg.PYTHON, cmd), 'w')
proc.write("%s\n" % sender)
for r in recip:
proc.write("%s\n" % r)
proc.write("\n")
proc.write(text)
proc.close()
def ContactTransportForEachGroup(sender, groups, text):
if len(groups) == 1:
ContactTransport(sender,groups[0],text)
return
for group in groups:
if not forker():
ContactTransport(sender,group,text)
os._exit(0)
def forker(tries=TRIES, refract=REFRACT):
"""Fork, retrying on EAGAIN errors with refract secs pause between tries.
Returns value of os.fork(), or raises the exception for:
(1) non-EAGAIN exception, or
(2) EGAIN exception encountered more than tries times."""
got = 0
# Loop until we successfully fork or the number tries is exceeded.
while 1:
try:
got = os.fork()
break
except os.error, val:
import errno, time
if val[0] == errno.EAGAIN and tries > 0:
# Resource temporarily unavailable - give time to recover.
tries = tries - 1
time.sleep(refract)
else:
# No go - reraise original exception, same stack frame and all.
Utils.reraise()
return got
if __name__ == "__main__":
main()
|