# Copyright (C) 2006-2014 by the Free Software Foundation, Inc.
#
# This file is part of GNU Mailman.
#
# GNU Mailman is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# GNU Mailman. If not, see .
"""Model for mailing lists."""
from __future__ import absolute_import, print_function, unicode_literals
__metaclass__ = type
__all__ = [
'MailingList',
]
import os
from storm.locals import (
And, Bool, DateTime, Float, Int, Pickle, RawStr, Reference, Store,
TimeDelta, Unicode)
from urlparse import urljoin
from zope.component import getUtility
from zope.event import notify
from zope.interface import implementer
from mailman.config import config
from mailman.database.model import Model
from mailman.database.types import Enum
from mailman.interfaces.action import Action, FilterAction
from mailman.interfaces.address import IAddress
from mailman.interfaces.archiver import ArchivePolicy
from mailman.interfaces.autorespond import ResponseAction
from mailman.interfaces.bounce import UnrecognizedBounceDisposition
from mailman.interfaces.digests import DigestFrequency
from mailman.interfaces.domain import IDomainManager
from mailman.interfaces.languages import ILanguageManager
from mailman.interfaces.mailinglist import (
IAcceptableAlias, IAcceptableAliasSet, IListArchiver, IListArchiverSet,
IMailingList, Personalization, ReplyToMunging)
from mailman.interfaces.member import (
AlreadySubscribedError, MemberRole, MissingPreferredAddressError,
SubscriptionEvent)
from mailman.interfaces.mime import FilterType
from mailman.interfaces.nntp import NewsgroupModeration
from mailman.interfaces.user import IUser
from mailman.model import roster
from mailman.model.digests import OneLastDigest
from mailman.model.member import Member
from mailman.model.mime import ContentFilter
from mailman.model.preferences import Preferences
from mailman.utilities.filesystem import makedirs
from mailman.utilities.string import expand
SPACE = ' '
UNDERSCORE = '_'
@implementer(IMailingList)
class MailingList(Model):
"""See `IMailingList`."""
id = Int(primary=True)
# XXX denotes attributes that should be part of the public interface but
# are currently missing.
# List identity
list_name = Unicode()
mail_host = Unicode()
_list_id = Unicode(name='list_id')
allow_list_posts = Bool()
include_rfc2369_headers = Bool()
advertised = Bool()
anonymous_list = Bool()
# Attributes not directly modifiable via the web u/i
created_at = DateTime()
# Attributes which are directly modifiable via the web u/i. The more
# complicated attributes are currently stored as pickles, though that
# will change as the schema and implementation is developed.
next_request_id = Int()
next_digest_number = Int()
digest_last_sent_at = DateTime()
volume = Int()
last_post_at = DateTime()
# Implicit destination.
acceptable_aliases_id = Int()
acceptable_alias = Reference(acceptable_aliases_id, 'AcceptableAlias.id')
# Attributes which are directly modifiable via the web u/i. The more
# complicated attributes are currently stored as pickles, though that
# will change as the schema and implementation is developed.
accept_these_nonmembers = Pickle() # XXX
admin_immed_notify = Bool()
admin_notify_mchanges = Bool()
administrivia = Bool()
archive_policy = Enum(ArchivePolicy)
# Automatic responses.
autoresponse_grace_period = TimeDelta()
autorespond_owner = Enum(ResponseAction)
autoresponse_owner_text = Unicode()
autorespond_postings = Enum(ResponseAction)
autoresponse_postings_text = Unicode()
autorespond_requests = Enum(ResponseAction)
autoresponse_request_text = Unicode()
# Content filters.
filter_action = Enum(FilterAction)
filter_content = Bool()
collapse_alternatives = Bool()
convert_html_to_plaintext = Bool()
# Bounces.
bounce_info_stale_after = TimeDelta() # XXX
bounce_matching_headers = Unicode() # XXX
bounce_notify_owner_on_disable = Bool() # XXX
bounce_notify_owner_on_removal = Bool() # XXX
bounce_score_threshold = Int() # XXX
bounce_you_are_disabled_warnings = Int() # XXX
bounce_you_are_disabled_warnings_interval = TimeDelta() # XXX
forward_unrecognized_bounces_to = Enum(UnrecognizedBounceDisposition)
process_bounces = Bool()
# Miscellaneous
default_member_action = Enum(Action)
default_nonmember_action = Enum(Action)
description = Unicode()
digest_footer_uri = Unicode()
digest_header_uri = Unicode()
digest_is_default = Bool()
digest_send_periodic = Bool()
digest_size_threshold = Float()
digest_volume_frequency = Enum(DigestFrequency)
digestable = Bool()
discard_these_nonmembers = Pickle()
emergency = Bool()
encode_ascii_prefixes = Bool()
first_strip_reply_to = Bool()
footer_uri = Unicode()
forward_auto_discards = Bool()
gateway_to_mail = Bool()
gateway_to_news = Bool()
goodbye_message_uri = Unicode()
header_matches = Pickle()
header_uri = Unicode()
hold_these_nonmembers = Pickle()
info = Unicode()
linked_newsgroup = Unicode()
max_days_to_hold = Int()
max_message_size = Int()
max_num_recipients = Int()
member_moderation_notice = Unicode()
mime_is_default_digest = Bool()
# FIXME: There should be no moderator_password
moderator_password = RawStr()
newsgroup_moderation = Enum(NewsgroupModeration)
nntp_prefix_subject_too = Bool()
nondigestable = Bool()
nonmember_rejection_notice = Unicode()
obscure_addresses = Bool()
owner_chain = Unicode()
owner_pipeline = Unicode()
personalize = Enum(Personalization)
post_id = Int()
posting_chain = Unicode()
posting_pipeline = Unicode()
_preferred_language = Unicode(name='preferred_language')
display_name = Unicode()
reject_these_nonmembers = Pickle()
reply_goes_to_list = Enum(ReplyToMunging)
reply_to_address = Unicode()
require_explicit_destination = Bool()
respond_to_post_requests = Bool()
scrub_nondigest = Bool()
send_goodbye_message = Bool()
send_welcome_message = Bool()
subject_prefix = Unicode()
topics = Pickle()
topics_bodylines_limit = Int()
topics_enabled = Bool()
welcome_message_uri = Unicode()
def __init__(self, fqdn_listname):
super(MailingList, self).__init__()
listname, at, hostname = fqdn_listname.partition('@')
assert hostname, 'Bad list name: {0}'.format(fqdn_listname)
self.list_name = listname
self.mail_host = hostname
self._list_id = '{0}.{1}'.format(listname, hostname)
# For the pending database
self.next_request_id = 1
# We need to set up the rosters. Normally, this method will get
# called when the MailingList object is loaded from the database, but
# that's not the case when the constructor is called. So, set up the
# rosters explicitly.
self.__storm_loaded__()
makedirs(self.data_path)
def __storm_loaded__(self):
self.owners = roster.OwnerRoster(self)
self.moderators = roster.ModeratorRoster(self)
self.administrators = roster.AdministratorRoster(self)
self.members = roster.MemberRoster(self)
self.regular_members = roster.RegularMemberRoster(self)
self.digest_members = roster.DigestMemberRoster(self)
self.subscribers = roster.Subscribers(self)
self.nonmembers = roster.NonmemberRoster(self)
def __repr__(self):
return ''.format(
self.fqdn_listname, id(self))
@property
def fqdn_listname(self):
"""See `IMailingList`."""
return '{0}@{1}'.format(self.list_name, self.mail_host)
@property
def list_id(self):
"""See `IMailingList`."""
return self._list_id
@property
def domain(self):
"""See `IMailingList`."""
return getUtility(IDomainManager)[self.mail_host]
@property
def scheme(self):
"""See `IMailingList`."""
return self.domain.scheme
@property
def web_host(self):
"""See `IMailingList`."""
return self.domain.url_host
def script_url(self, target, context=None):
"""See `IMailingList`."""
# XXX Handle the case for when context is not None; those would be
# relative URLs.
return urljoin(self.domain.base_url, target + '/' + self.fqdn_listname)
@property
def data_path(self):
"""See `IMailingList`."""
return os.path.join(config.LIST_DATA_DIR, self.fqdn_listname)
# IMailingListAddresses
@property
def posting_address(self):
"""See `IMailingList`."""
return self.fqdn_listname
@property
def no_reply_address(self):
"""See `IMailingList`."""
return '{0}@{1}'.format(config.mailman.noreply_address, self.mail_host)
@property
def owner_address(self):
"""See `IMailingList`."""
return '{0}-owner@{1}'.format(self.list_name, self.mail_host)
@property
def request_address(self):
"""See `IMailingList`."""
return '{0}-request@{1}'.format(self.list_name, self.mail_host)
@property
def bounces_address(self):
"""See `IMailingList`."""
return '{0}-bounces@{1}'.format(self.list_name, self.mail_host)
@property
def join_address(self):
"""See `IMailingList`."""
return '{0}-join@{1}'.format(self.list_name, self.mail_host)
@property
def leave_address(self):
"""See `IMailingList`."""
return '{0}-leave@{1}'.format(self.list_name, self.mail_host)
@property
def subscribe_address(self):
"""See `IMailingList`."""
return '{0}-subscribe@{1}'.format(self.list_name, self.mail_host)
@property
def unsubscribe_address(self):
"""See `IMailingList`."""
return '{0}-unsubscribe@{1}'.format(self.list_name, self.mail_host)
def confirm_address(self, cookie):
"""See `IMailingList`."""
local_part = expand(config.mta.verp_confirm_format, dict(
address = '{0}-confirm'.format(self.list_name),
cookie = cookie))
return '{0}@{1}'.format(local_part, self.mail_host)
@property
def preferred_language(self):
"""See `IMailingList`."""
return getUtility(ILanguageManager)[self._preferred_language]
@preferred_language.setter
def preferred_language(self, language):
"""See `IMailingList`."""
# Accept both a language code and a `Language` instance.
try:
self._preferred_language = language.code
except AttributeError:
self._preferred_language = language
def send_one_last_digest_to(self, address, delivery_mode):
"""See `IMailingList`."""
digest = OneLastDigest(self, address, delivery_mode)
Store.of(self).add(digest)
@property
def last_digest_recipients(self):
"""See `IMailingList`."""
results = Store.of(self).find(
OneLastDigest,
OneLastDigest.mailing_list == self)
recipients = [(digest.address, digest.delivery_mode)
for digest in results]
results.remove()
return recipients
@property
def filter_types(self):
"""See `IMailingList`."""
results = Store.of(self).find(
ContentFilter,
And(ContentFilter.mailing_list == self,
ContentFilter.filter_type == FilterType.filter_mime))
for content_filter in results:
yield content_filter.filter_pattern
@filter_types.setter
def filter_types(self, sequence):
"""See `IMailingList`."""
# First, delete all existing MIME type filter patterns.
store = Store.of(self)
results = store.find(
ContentFilter,
And(ContentFilter.mailing_list == self,
ContentFilter.filter_type == FilterType.filter_mime))
results.remove()
# Now add all the new filter types.
for mime_type in sequence:
content_filter = ContentFilter(
self, mime_type, FilterType.filter_mime)
store.add(content_filter)
@property
def pass_types(self):
"""See `IMailingList`."""
results = Store.of(self).find(
ContentFilter,
And(ContentFilter.mailing_list == self,
ContentFilter.filter_type == FilterType.pass_mime))
for content_filter in results:
yield content_filter.filter_pattern
@pass_types.setter
def pass_types(self, sequence):
"""See `IMailingList`."""
# First, delete all existing MIME type pass patterns.
store = Store.of(self)
results = store.find(
ContentFilter,
And(ContentFilter.mailing_list == self,
ContentFilter.filter_type == FilterType.pass_mime))
results.remove()
# Now add all the new filter types.
for mime_type in sequence:
content_filter = ContentFilter(
self, mime_type, FilterType.pass_mime)
store.add(content_filter)
@property
def filter_extensions(self):
"""See `IMailingList`."""
results = Store.of(self).find(
ContentFilter,
And(ContentFilter.mailing_list == self,
ContentFilter.filter_type == FilterType.filter_extension))
for content_filter in results:
yield content_filter.filter_pattern
@filter_extensions.setter
def filter_extensions(self, sequence):
"""See `IMailingList`."""
# First, delete all existing file extensions filter patterns.
store = Store.of(self)
results = store.find(
ContentFilter,
And(ContentFilter.mailing_list == self,
ContentFilter.filter_type == FilterType.filter_extension))
results.remove()
# Now add all the new filter types.
for mime_type in sequence:
content_filter = ContentFilter(
self, mime_type, FilterType.filter_extension)
store.add(content_filter)
@property
def pass_extensions(self):
"""See `IMailingList`."""
results = Store.of(self).find(
ContentFilter,
And(ContentFilter.mailing_list == self,
ContentFilter.filter_type == FilterType.pass_extension))
for content_filter in results:
yield content_filter.pass_pattern
@pass_extensions.setter
def pass_extensions(self, sequence):
"""See `IMailingList`."""
# First, delete all existing file extensions pass patterns.
store = Store.of(self)
results = store.find(
ContentFilter,
And(ContentFilter.mailing_list == self,
ContentFilter.filter_type == FilterType.pass_extension))
results.remove()
# Now add all the new filter types.
for mime_type in sequence:
content_filter = ContentFilter(
self, mime_type, FilterType.pass_extension)
store.add(content_filter)
def get_roster(self, role):
"""See `IMailingList`."""
if role is MemberRole.member:
return self.members
elif role is MemberRole.owner:
return self.owners
elif role is MemberRole.moderator:
return self.moderators
else:
raise TypeError(
'Undefined MemberRole: {0}'.format(role))
def subscribe(self, subscriber, role=MemberRole.member):
"""See `IMailingList`."""
store = Store.of(self)
if IAddress.providedBy(subscriber):
member = store.find(
Member,
Member.role == role,
Member.list_id == self._list_id,
Member._address == subscriber).one()
if member:
raise AlreadySubscribedError(
self.fqdn_listname, subscriber.email, role)
elif IUser.providedBy(subscriber):
if subscriber.preferred_address is None:
raise MissingPreferredAddressError(subscriber)
member = store.find(
Member,
Member.role == role,
Member.list_id == self._list_id,
Member._user == subscriber).one()
if member:
raise AlreadySubscribedError(
self.fqdn_listname, subscriber, role)
else:
raise ValueError('subscriber must be an address or user')
member = Member(role=role,
list_id=self._list_id,
subscriber=subscriber)
member.preferences = Preferences()
store.add(member)
notify(SubscriptionEvent(self, member))
return member
@implementer(IAcceptableAlias)
class AcceptableAlias(Model):
"""See `IAcceptableAlias`."""
id = Int(primary=True)
mailing_list_id = Int()
mailing_list = Reference(mailing_list_id, MailingList.id)
alias = Unicode()
def __init__(self, mailing_list, alias):
self.mailing_list = mailing_list
self.alias = alias
@implementer(IAcceptableAliasSet)
class AcceptableAliasSet:
"""See `IAcceptableAliasSet`."""
def __init__(self, mailing_list):
self._mailing_list = mailing_list
def clear(self):
"""See `IAcceptableAliasSet`."""
Store.of(self._mailing_list).find(
AcceptableAlias,
AcceptableAlias.mailing_list == self._mailing_list).remove()
def add(self, alias):
if not (alias.startswith('^') or '@' in alias):
raise ValueError(alias)
alias = AcceptableAlias(self._mailing_list, alias.lower())
Store.of(self._mailing_list).add(alias)
def remove(self, alias):
Store.of(self._mailing_list).find(
AcceptableAlias,
And(AcceptableAlias.mailing_list == self._mailing_list,
AcceptableAlias.alias == alias.lower())).remove()
@property
def aliases(self):
aliases = Store.of(self._mailing_list).find(
AcceptableAlias,
AcceptableAlias.mailing_list == self._mailing_list)
for alias in aliases:
yield alias.alias
@implementer(IListArchiver)
class ListArchiver(Model):
"""See `IListArchiver`."""
id = Int(primary=True)
mailing_list_id = Int()
mailing_list = Reference(mailing_list_id, MailingList.id)
name = Unicode()
_is_enabled = Bool()
def __init__(self, mailing_list, archiver_name, system_archiver):
self.mailing_list = mailing_list
self.name = archiver_name
self._is_enabled = system_archiver.is_enabled
@property
def system_archiver(self):
for archiver in config.archivers:
if archiver.name == self.name:
return archiver
return None
@property
def is_enabled(self):
return self.system_archiver.is_enabled and self._is_enabled
@is_enabled.setter
def is_enabled(self, value):
self._is_enabled = value
@implementer(IListArchiverSet)
class ListArchiverSet:
def __init__(self, mailing_list):
self._mailing_list = mailing_list
system_archivers = {}
for archiver in config.archivers:
system_archivers[archiver.name] = archiver
# Add any system enabled archivers which aren't already associated
# with the mailing list.
store = Store.of(self._mailing_list)
for archiver_name in system_archivers:
exists = store.find(
ListArchiver,
And(ListArchiver.mailing_list == mailing_list,
ListArchiver.name == archiver_name)).one()
if exists is None:
store.add(ListArchiver(mailing_list, archiver_name,
system_archivers[archiver_name]))
@property
def archivers(self):
entries = Store.of(self._mailing_list).find(
ListArchiver, ListArchiver.mailing_list == self._mailing_list)
for entry in entries:
yield entry
def get(self, archiver_name):
return Store.of(self._mailing_list).find(
ListArchiver,
And(ListArchiver.mailing_list == self._mailing_list,
ListArchiver.name == archiver_name)).one()