1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
# Copyright (C) 2007-2017 by the Free Software Foundation, Inc.
#
# This file is part of GNU Mailman.
#
# GNU Mailman is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
"""A mailing list manager."""
from mailman.database.transaction import dbconnection
from mailman.interfaces.address import InvalidEmailAddressError
from mailman.interfaces.listmanager import (
IListManager, ListAlreadyExistsError, ListCreatedEvent, ListCreatingEvent,
ListDeletedEvent, ListDeletingEvent)
from mailman.interfaces.requests import IListRequests
from mailman.model.autorespond import AutoResponseRecord
from mailman.model.bans import Ban
from mailman.model.mailinglist import (
IAcceptableAliasSet, ListArchiver, MailingList)
from mailman.model.mime import ContentFilter
from mailman.utilities.datetime import now
from mailman.utilities.queries import QuerySequence
from public import public
from zope.event import notify
from zope.interface import implementer
@public
@implementer(IListManager)
class ListManager:
"""An implementation of the `IListManager` interface."""
@dbconnection
def create(self, store, fqdn_listname):
"""See `IListManager`."""
fqdn_listname = fqdn_listname.lower()
listname, at, hostname = fqdn_listname.partition('@')
if len(hostname) == 0:
raise InvalidEmailAddressError(fqdn_listname)
list_id = '{}.{}'.format(listname, hostname)
notify(ListCreatingEvent(fqdn_listname))
mlist = store.query(MailingList).filter_by(_list_id=list_id).first()
if mlist:
raise ListAlreadyExistsError(fqdn_listname)
mlist = MailingList(fqdn_listname)
mlist.created_at = now()
store.add(mlist)
notify(ListCreatedEvent(mlist))
return mlist
@dbconnection
def get(self, store, list_spec):
"""See `IListManager`."""
return (self.get_by_fqdn(list_spec)
if '@' in list_spec
else self.get_by_list_id(list_spec))
@dbconnection
def get_by_list_id(self, store, list_id):
"""See `IListManager`."""
return store.query(MailingList).filter_by(_list_id=list_id).first()
@dbconnection
def get_by_fqdn(self, store, fqdn_listname):
"""See `IListManager`."""
listname, at, hostname = fqdn_listname.partition('@')
list_id = '{}.{}'.format(listname, hostname)
return store.query(MailingList).filter_by(_list_id=list_id).first()
@dbconnection
def delete(self, store, mlist):
"""See `IListManager`."""
fqdn_listname = mlist.fqdn_listname
notify(ListDeletingEvent(mlist))
# First delete information associated with the mailing list.
IAcceptableAliasSet(mlist).clear()
IListRequests(mlist).clear()
store.query(AutoResponseRecord).filter_by(mailing_list=mlist).delete()
store.query(ContentFilter).filter_by(mailing_list=mlist).delete()
store.query(ListArchiver).filter_by(mailing_list=mlist).delete()
store.query(Ban).filter_by(list_id=mlist.list_id).delete()
store.delete(mlist)
notify(ListDeletedEvent(fqdn_listname))
@property
@dbconnection
def mailing_lists(self, store):
"""See `IListManager`."""
yield from store.query(MailingList).order_by(
MailingList._list_id).all()
@dbconnection
def __iter__(self, store):
"""See `IListManager`."""
yield from store.query(MailingList).order_by(
MailingList._list_id).all()
@property
@dbconnection
def names(self, store):
"""See `IListManager`."""
result_set = store.query(MailingList)
for mail_host, list_name in result_set.values(MailingList.mail_host,
MailingList.list_name):
yield '{}@{}'.format(list_name, mail_host)
@property
@dbconnection
def list_ids(self, store):
"""See `IListManager`."""
result_set = store.query(MailingList)
for list_id in result_set.values(MailingList._list_id):
assert isinstance(list_id, tuple) and len(list_id) == 1
yield list_id[0]
@property
@dbconnection
def name_components(self, store):
"""See `IListManager`."""
result_set = store.query(MailingList)
for mail_host, list_name in result_set.values(MailingList.mail_host,
MailingList.list_name):
yield list_name, mail_host
@dbconnection
def find(self, store, *, advertised=None, mail_host=None):
query = store.query(MailingList)
if advertised is not None:
query = query.filter_by(advertised=advertised)
if mail_host is not None:
query = query.filter_by(mail_host=mail_host)
query = query.order_by(MailingList._list_id)
return QuerySequence(query)
|