From 99f721f65906e4f2d1036da3a886426aa0ec5aea Mon Sep 17 00:00:00 2001 From: bwarsaw Date: Fri, 19 Jun 1998 19:32:48 +0000 Subject: All these files have been moved to the Mailman directory (and some renamed) --- modules/mm_utils.py | 506 ---------------------------------------------------- 1 file changed, 506 deletions(-) delete mode 100644 modules/mm_utils.py (limited to 'modules/mm_utils.py') diff --git a/modules/mm_utils.py b/modules/mm_utils.py deleted file mode 100644 index e76ac1920..000000000 --- a/modules/mm_utils.py +++ /dev/null @@ -1,506 +0,0 @@ -# Copyright (C) 1998 by the Free Software Foundation, Inc. -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. - - -"""Miscellaneous essential routines. - -This includes actual message transmission routines, address checking and -message and address munging, a handy-dandy routine to map a function on all -the maillists, the Logging routines, and whatever else doesn't belong -elsewhere.""" - - -import sys, string, fcntl, os, random, regsub, re -import mm_cfg - -# Valid toplevel domains for when we check the validity of an email address. - -valid_toplevels = ["com", "edu", "gov", "int", "mil", "net", "org", -"inc", "af", "al", "dz", "as", "ad", "ao", "ai", "aq", "ag", "ar", -"am", "aw", "au", "at", "az", "bs", "bh", "bd", "bb", "by", "be", -"bz", "bj", "bm", "bt", "bo", "ba", "bw", "bv", "br", "io", "bn", -"bg", "bf", "bi", "kh", "cm", "ca", "cv", "ky", "cf", "td", "cl", -"cn", "cx", "cc", "co", "km", "cg", "ck", "cr", "ci", "hr", "cu", -"cy", "cz", "dk", "dj", "dm", "do", "tp", "ec", "eg", "sv", "gq", -"ee", "et", "fk", "fo", "fj", "fi", "fr", "gf", "pf", "tf", "ga", -"gm", "ge", "de", "gh", "gi", "gb", "uk", "gr", "gl", "gd", "gp", -"gu", "gt", "gn", "gw", "gy", "ht", "hm", "hn", "hk", "hu", "is", -"in", "id", "ir", "iq", "ie", "il", "it", "jm", "jp", "jo", "kz", -"ke", "ki", "kp", "kr", "kw", "kg", "la", "lv", "lb", "ls", "lr", -"ly", "li", "lt", "lu", "mo", "mg", "mw", "my", "mv", "ml", "mt", -"mh", "mq", "mr", "mu", "mx", "fm", "md", "mc", "mn", "ms", "ma", -"mz", "mm", "na", "nr", "np", "an", "nl", "nt", "nc", "nz", "ni", -"ne", "ng", "nu", "nf", "mp", "no", "om", "pk", "pw", "pa", "pg", -"py", "pe", "ph", "pn", "pl", "pt", "pr", "qa", "re", "ro", "ru", -"rw", "kn", "lc", "vc", "sm", "st", "sa", "sn", "sc", "sl", "sg", -"sk", "si", "sb", "so", "za", "es", "lk", "sh", "pm", "sd", "sr", -"sj", "sz", "se", "ch", "sy", "tw", "tj", "tz", "th", "tg", "tk", -"to", "tt", "tn", "tr", "tm", "tc", "tv", "ug", "ua", "um", "us", -"uy", "uz", "vu", "va", "ve", "vn", "vg", "vi", "wf", "eh", "ws", -"ye", "yu", "zr", "zm", "zw", "su"] - -def list_names(): - """Return the names of all lists in default list directory.""" - got = [] - for fn in os.listdir(mm_cfg.LIST_DATA_DIR): - if not ( - os.path.exists( - os.path.join(os.path.join(mm_cfg.LIST_DATA_DIR, fn), - 'config.db'))): - continue - got.append(fn) - return got - -# a much more naive implementation than say, Emacs's fill-paragraph! -def wrap(text, column=70): - """Wrap and fill the text to the specified column. - - Wrapping is always in effect, although if it is not possible to wrap a - line (because some word is longer than `column' characters) the line is - broken at the next available whitespace boundary. Paragraphs are also - always filled, unless the line begins with whitespace. This is the - algorithm that the Python FAQ wizard uses, and seems like a good - compromise. - - """ - wrapped = '' - # first split the text into paragraphs, defined as a blank line - paras = re.split('\n\n', text) - for para in paras: - # fill - lines = [] - fillprev = 0 - for line in string.split(para, '\n'): - if not line: - lines.append(line) - continue - if line[0] in string.whitespace: - fillthis = 0 - else: - fillthis = 1 - if fillprev and fillthis: - # if the previous line should be filled, then just append a - # single space, and the rest of the current line - lines[-1] = string.rstrip(lines[-1]) + ' ' + line - else: - # no fill, i.e. retain newline - lines.append(line) - fillprev = fillthis - # wrap each line - for text in lines: - while text: - if len(text) <= column: - line = text - text = '' - else: - bol = column - # find the last whitespace character - while bol > 0 and text[bol] not in string.whitespace: - bol = bol - 1 - # now find the last non-whitespace character - eol = bol - while eol > 0 and text[eol] in string.whitespace: - eol = eol - 1 - # watch out for text that's longer than the column width - if eol == 0: - # break on whitespace after column - eol = column - while eol < len(text) and \ - text[eol] not in string.whitespace: - eol = eol + 1 - bol = eol - while bol < len(text) and \ - text[bol] in string.whitespace: - bol = bol + 1 - bol = bol - 1 - line = text[:eol+1] + '\n' - text = text[bol+1:] - wrapped = wrapped + line - wrapped = wrapped + '\n' - wrapped = wrapped + '\n' - return wrapped - - -def SendTextToUser(subject, text, recipient, sender, add_headers=[], raw=0): - import mm_message - msg = mm_message.OutgoingMessage() - msg.SetSender(sender) - msg.SetHeader('Subject', subject, 1) - if not raw: - text = wrap(text) - msg.SetBody(QuotePeriods(text)) - DeliverToUser(msg, recipient, add_headers=add_headers) - -def DeliverToUser(msg, recipient, add_headers=[]): - """Use smtplib to deliver message. - - Optional argument add_headers should be a list of headers to be added - to the message, e.g. for Errors-To and X-No-Archive.""" - # We fork to ensure no deadlock. Otherwise, even if sendmail is - # invoked in forking mode, if it eg detects a bad address before - # forking, then it will try deliver to the errorsto addr *in the - # foreground*. If the errorsto happens to be the list owner for a list - # that is doing the send - and holding a lock - then the delivery will - # hang pending release of the lock - deadlock. - if os.fork(): - return - import smtplib - sender = msg.GetSender() - - try: - try: - msg.headers.remove('\n') - except ValueError: - pass - if not msg.getheader('to'): - msg.headers.append('To: %s\n' % recipient) - for i in add_headers: - if i and i[-1] != '\n': - i = i + '\n' - msg.headers.append(i) - - text = string.join(msg.headers, '')+ '\n'+ QuotePeriods(msg.body) - con = smtplib.SmtpConnection(mm_cfg.SMTPHOST) - con.helo(mm_cfg.DEFAULT_HOST_NAME) - con.send(to=recipient,frm=sender,text=text) - con.quit() - finally: - os._exit(0) - -def QuotePeriods(text): - return string.join(string.split(text, '\n.\n'), '\n .\n') - -def ValidEmail(str): - """Verify that the an email address isn't grossly invalid.""" - # Pretty minimal, cheesy check. We could do better... - if ((string.find(str, '|') <> -1) or (string.find(str, ';') <> -1) - or str[0] == '-'): - raise mm_err.MMHostileAddress - if string.find(str, '/') <> -1: - if os.path.isdir(os.path.split(str)[0]): - raise mm_err.MMHostileAddress - user, domain_parts = ParseEmail(str) - if not domain_parts: - if string.find(str, '@') < 1: - return 0 - else: - return 1 - if len(domain_parts) < 2: - return 0 -## if domain_parts[-1] not in valid_toplevels: -## if len(domain_parts) <> 4: -## return 0 -## try: -## domain_parts = map(eval, domain_parts) -## except: -## return 0 -## for i in domain_parts: -## if i < 0 or i > 255: -## return 0 - return 1 - - -# -def GetPathPieces(path): - l = string.split(path, '/') - try: - while 1: - l.remove('') - except ValueError: - pass - return l - -nesting_level = None -def GetNestingLevel(): - global nesting_level - if nesting_level == None: - try: - path = os.environ['PATH_INFO'] - if path[0] <> '/': - path= '/' + path - nesting_level = len(string.split(path, '/')) - 1 - except KeyError: - nesting_level = 0 - return nesting_level - -def MakeDirTree(path, perms=0775, verbose=0): - made_part = '/' - path_parts = GetPathPieces(path) - for item in path_parts: - made_part = os.path.join(made_part, item) - if os.path.exists(made_part): - if not os.path.isdir(made_part): - raise "RuntimeError", ("Couldn't make dir tree for %s. (%s" - " already exists)" % (path, made_part)) - else: - ou = os.umask(0) - try: - os.mkdir(made_part, perms) - finally: - os.umask(ou) - if verbose: - print 'made directory: ', madepart - -# This takes an email address, and returns a tuple containing (user,host) -def ParseEmail(email): - user = None - domain = None - email = string.lower(email) - at_sign = string.find(email, '@') - if at_sign < 1: - return (email, None) - user = email[:at_sign] - rest = email[at_sign+1:] - domain = string.split(rest, '.') - return (user, domain) - -# Return 1 if the 2 addresses match. 0 otherwise. -# Might also want to match if there's any common domain name... -# There's password protection anyway. - -def AddressesMatch(addr1, addr2): - "True when username matches and host addr of one addr contains other's." - user1, domain1 = ParseEmail(addr1) - user2, domain2 = ParseEmail(addr2) - if user1 != user2: - return 0 - if domain1 == domain2: - return 1 - elif not domain1 or not domain2: - return 0 - for i in range(-1 * min(len(domain1), len(domain2)), 0): - # By going from most specific component of host part we're likely - # to hit a difference sooner. - if domain1[i] != domain2[i]: - return 0 - return 1 - - -def FindMatchingAddresses(name, array): - """Given an email address, and a list of email addresses, returns the - subset of the list that matches the given address. Should sort based - on exactness of match, just in case.""" - - def CallAddressesMatch (x, y=name): - return AddressesMatch(x,y) - - matches = filter(CallAddressesMatch, array) - return matches - -def GetRandomSeed(): - chr1 = int(random.random() * 57) + 65 - chr2 = int(random.random() * 57) + 65 - return "%c%c" % (chr1, chr2) - - -def SnarfMessage(msg): - if msg.unixfrom: - text = msg.unixfrom + string.join(msg.headers, '') + '\n' + msg.body - else: - text = string.join(msg.headers, '') + '\r\n' + msg.body - return (msg.GetSender(), text) - - -def QuoteHyperChars(str): - arr = regsub.splitx(str, '[<>"&]') - i = 1 - while i < len(arr): - if arr[i] == '<': - arr[i] = '<' - elif arr[i] == '>': - arr[i] = '>' - elif arr[i] == '"': - arr[i] = '"' - else: #if arr[i] == '&': - arr[i] = '&' - i = i + 2 - return string.join(arr, '') - -# Just changing these two functions should be enough to control the way -# that email address obscuring is handled. - -def ObscureEmail(addr, for_text=0): - """Make email address unrecognizable to web spiders, but invertable. - - When for_text option is set (not default), make a sentence fragment - instead of a token.""" - if for_text: - return re.sub("@", " at ", addr) - else: - return re.sub("@", "__at__", addr) - -def UnobscureEmail(addr): - """Invert ObscureEmail() conversion.""" - # Contrived to act as an identity operation on already-unobscured - # emails, so routines expecting obscured ones will accept both. - return re.sub("__at__", "@", addr) - -def map_maillists(func, names=None, unlock=None, verbose=0): - """Apply function (of one argument) to all list objs in turn. - - Returns a list of the results. - - Optional arg 'names' specifies which lists, default all. - Optional arg unlock says to unlock immediately after instantiation. - Optional arg verbose says to print list name as it's about to be - instantiated, CR when instantiation is complete, and result of - application as it shows.""" - from maillist import MailList - if names == None: names = list_names() - got = [] - for i in names: - if verbose: print i, - l = MailList(i) - if verbose: print - if unlock and l.Locked(): - l.Unlock() - got.append(apply(func, (l,))) - if verbose: print got[-1] - if not unlock: - l.Unlock() - del l - return got - -class Logger: - """File-based logger, writes to named category files in mm_cfg.LOG_DIR.""" - def __init__(self, category, nofail=1): - """Nofail (by default) says to fallback to sys.stderr if write - fails to category file. A message is emitted, but the IOError is - caught. Set nofail=0 if you want to handle the error in your code, - instead.""" - - self.__category=category - self.__f = None - self.__nofail = nofail - def __get_f(self): - if self.__f: - return self.__f - else: - fname = os.path.join(mm_cfg.LOG_DIR, self.__category) - try: - ou = os.umask(002) - try: - f = self.__f = open(fname, 'a+') - finally: - os.umask(ou) - except IOError, msg: - if not self.__nofail: - raise IOError, msg, sys.exc_info()[2] - else: - f = self.__f = sys.stderr - f.write("logger open %s failed %s, using stderr\n" - % (fname, msg)) - return f - def flush(self): - f = self.__get_f() - if hasattr(f, 'flush'): - f.flush() - def write(self, msg): - f = self.__get_f() - try: - f.write(msg) - except IOError, msg: - f = self.__f = sys.stderr - f.write("logger write %s failed %s, using stderr\n" - % (fname, msg)) - def writelines(self, lines): - for l in lines: - self.write(l) - def close(self): - if not self.__f: - return - self.__get_f().close() - def __del__(self): - try: - if self.__f and self.__f != sys.stderr: - self.close() - except: - pass - -class StampedLogger(Logger): - """Record messages in log files, including date stamp and optional label. - - If manual_reprime is on (off by default), then timestamp prefix will - included only on first .write() and on any write immediately following - a call to the .reprime() method. This is useful for when StampedLogger - is substituting for sys.stderr, where you'd like to see the grouping of - multiple writes under a single timestamp (and there is often is one - group, for uncaught exceptions where a script is bombing). - - In any case, the identifying prefix will only follow writes that start - on a new line. - - Nofail (by default) says to fallback to sys.stderr if write fails to - category file. A message is emitted, but the IOError is caught. - Initialize with nofail=0 if you want to handle the error in your code, - instead.""" - - def __init__(self, category, label=None, manual_reprime=0, nofail=1): - "If specified, optional label is included after timestamp." - self.label = label - self.manual_reprime = manual_reprime - self.primed = 1 - self.bol = 1 - Logger.__init__(self, category, nofail=nofail) - def reprime(self): - """Reset so timestamp will be included with next write.""" - self.primed = 1 - def write(self, msg): - import time - if not self.bol: - prefix = "" - else: - if not self.manual_reprime or self.primed: - stamp = time.strftime("%b %d %H:%M:%S %Y ", - time.localtime(time.time())) - self.primed = 0 - else: - stamp = "" - if self.label == None: - label = "" - else: - label = "%s:" % self.label - prefix = stamp + label - Logger.write(self, "%s %s" % (prefix, msg)) - if msg and msg[-1] == '\n': - self.bol = 1 - else: - self.bol = 0 - def writelines(self, lines): - first = 1 - for l in lines: - if first: - self.write(l) - first = 0 - else: - if l and l[0] not in [' ', '\t', '\n']: - Logger.write(self, ' ' + l) - else: - Logger.write(self, l) - -def chunkify(members, chunksize=mm_cfg.ADMIN_MEMBER_CHUNKSIZE): - """ - return a list of lists of members - """ - members.sort() - res = [] - while 1: - if not members: - break - chunk = members[:chunksize] - res.append(chunk) - members = members[chunksize:] - return res -- cgit v1.2.3-70-g09d2