diff options
| author | bwarsaw | 2001-02-15 06:09:38 +0000 |
|---|---|---|
| committer | bwarsaw | 2001-02-15 06:09:38 +0000 |
| commit | 1498d08ccdb7242dc6a6cad036c6cecb785d4a81 (patch) | |
| tree | 9d0ae3727cf0ae46535a7d286da15c4253c9b82b /Mailman/pythonlib/mailbox.py | |
| parent | 2e7a477923db229bf9f0d2670e7e2412dd243aa9 (diff) | |
| download | mailman-1498d08ccdb7242dc6a6cad036c6cecb785d4a81.tar.gz mailman-1498d08ccdb7242dc6a6cad036c6cecb785d4a81.tar.zst mailman-1498d08ccdb7242dc6a6cad036c6cecb785d4a81.zip | |
Diffstat (limited to 'Mailman/pythonlib/mailbox.py')
| -rwxr-xr-x | Mailman/pythonlib/mailbox.py | 312 |
1 files changed, 312 insertions, 0 deletions
diff --git a/Mailman/pythonlib/mailbox.py b/Mailman/pythonlib/mailbox.py new file mode 100755 index 000000000..2f96106c6 --- /dev/null +++ b/Mailman/pythonlib/mailbox.py @@ -0,0 +1,312 @@ +#! /usr/bin/env python + +"""Classes to handle Unix style, MMDF style, and MH style mailboxes.""" + + +import rfc822 +import os + +__all__ = ["UnixMailbox","MmdfMailbox","MHMailbox","Maildir","BabylMailbox"] + +class _Mailbox: + def __init__(self, fp, factory=rfc822.Message): + self.fp = fp + self.seekp = 0 + self.factory = factory + + def seek(self, pos, whence=0): + if whence==1: # Relative to current position + self.pos = self.pos + pos + if whence==2: # Relative to file's end + self.pos = self.stop + pos + else: # Default - absolute position + self.pos = self.start + pos + + def next(self): + while 1: + self.fp.seek(self.seekp) + try: + self._search_start() + except EOFError: + self.seekp = self.fp.tell() + return None + start = self.fp.tell() + self._search_end() + self.seekp = stop = self.fp.tell() + if start != stop: + break + return self.factory(_Subfile(self.fp, start, stop)) + + +class _Subfile: + def __init__(self, fp, start, stop): + self.fp = fp + self.start = start + self.stop = stop + self.pos = self.start + + def read(self, length = None): + if self.pos >= self.stop: + return '' + remaining = self.stop - self.pos + if length is None or length < 0: + length = remaining + elif length > remaining: + length = remaining + self.fp.seek(self.pos) + data = self.fp.read(length) + self.pos = self.fp.tell() + return data + + def readline(self, length = None): + if self.pos >= self.stop: + return '' + if length is None: + length = self.stop - self.pos + self.fp.seek(self.pos) + data = self.fp.readline(length) + self.pos = self.fp.tell() + return data + + def readlines(self, sizehint = -1): + lines = [] + while 1: + line = self.readline() + if not line: + break + lines.append(line) + if sizehint >= 0: + sizehint = sizehint - len(line) + if sizehint <= 0: + break + return lines + + def tell(self): + return self.pos - self.start + + def seek(self, pos, whence=0): + if whence == 0: + self.pos = self.start + pos + elif whence == 1: + self.pos = self.pos + pos + elif whence == 2: + self.pos = self.stop + pos + + def close(self): + del self.fp + + +class UnixMailbox(_Mailbox): + def _search_start(self): + while 1: + pos = self.fp.tell() + line = self.fp.readline() + if not line: + raise EOFError + if line[:5] == 'From ' and self._isrealfromline(line): + self.fp.seek(pos) + return + + def _search_end(self): + self.fp.readline() # Throw away header line + while 1: + pos = self.fp.tell() + line = self.fp.readline() + if not line: + return + if line[:5] == 'From ' and self._isrealfromline(line): + self.fp.seek(pos) + return + + # An overridable mechanism to test for From-line-ness. You can either + # specify a different regular expression or define a whole new + # _isrealfromline() method. Note that this only gets called for lines + # starting with the 5 characters "From ". + # + # BAW: According to + #http://home.netscape.com/eng/mozilla/2.0/relnotes/demo/content-length.html + # the only portable, reliable way to find message delimiters in a BSD (i.e + # Unix mailbox) style folder is to search for "\n\nFrom .*\n", or at the + # beginning of the file, "^From .*\n". While _fromlinepattern below seems + # like a good idea, in practice, there are too many variations for more + # strict parsing of the line to be completely accurate. + # + # _strict_isrealfromline() is the old version which tries to do stricter + # parsing of the From_ line. _portable_isrealfromline() simply returns + # true, since it's never called if the line doesn't already start with + # "From ". + # + # This algorithm, and the way it interacts with _search_start() and + # _search_end() may not be completely correct, because it doesn't check + # that the two characters preceding "From " are \n\n or the beginning of + # the file. Fixing this would require a more extensive rewrite than is + # necessary. For convenience, we've added a StrictUnixMailbox class which + # uses the older, more strict _fromlinepattern regular expression. + + _fromlinepattern = r"From \s*[^\s]+\s+\w\w\w\s+\w\w\w\s+\d?\d\s+" \ + r"\d?\d:\d\d(:\d\d)?(\s+[^\s]+)?\s+\d\d\d\d\s*$" + _regexp = None + + def _strict_isrealfromline(self, line): + if not self._regexp: + import re + self._regexp = re.compile(self._fromlinepattern) + return self._regexp.match(line) + + def _portable_isrealfromline(self, line): + return 1 + + _isrealfromline = _strict_isrealfromline + + +class PortableUnixMailbox(UnixMailbox): + _isrealfromline = UnixMailbox._portable_isrealfromline + + +class MmdfMailbox(_Mailbox): + def _search_start(self): + while 1: + line = self.fp.readline() + if not line: + raise EOFError + if line[:5] == '\001\001\001\001\n': + return + + def _search_end(self): + while 1: + pos = self.fp.tell() + line = self.fp.readline() + if not line: + return + if line == '\001\001\001\001\n': + self.fp.seek(pos) + return + + +class MHMailbox: + def __init__(self, dirname, factory=rfc822.Message): + import re + pat = re.compile('^[1-9][0-9]*$') + self.dirname = dirname + # the three following lines could be combined into: + # list = map(long, filter(pat.match, os.listdir(self.dirname))) + list = os.listdir(self.dirname) + list = filter(pat.match, list) + list = map(long, list) + list.sort() + # This only works in Python 1.6 or later; + # before that str() added 'L': + self.boxes = map(str, list) + self.factory = factory + + def next(self): + if not self.boxes: + return None + fn = self.boxes[0] + del self.boxes[0] + fp = open(os.path.join(self.dirname, fn)) + return self.factory(fp) + + +class Maildir: + # Qmail directory mailbox + + def __init__(self, dirname, factory=rfc822.Message): + self.dirname = dirname + self.factory = factory + + # check for new mail + newdir = os.path.join(self.dirname, 'new') + boxes = [os.path.join(newdir, f) + for f in os.listdir(newdir) if f[0] != '.'] + + # Now check for current mail in this maildir + curdir = os.path.join(self.dirname, 'cur') + boxes += [os.path.join(curdir, f) + for f in os.listdir(curdir) if f[0] != '.'] + + self.boxes = boxes + + def next(self): + if not self.boxes: + return None + fn = self.boxes[0] + del self.boxes[0] + fp = open(fn) + return self.factory(fp) + + +class BabylMailbox(_Mailbox): + def _search_start(self): + while 1: + line = self.fp.readline() + if not line: + raise EOFError + if line == '*** EOOH ***\n': + return + + def _search_end(self): + while 1: + pos = self.fp.tell() + line = self.fp.readline() + if not line: + return + if line == '\037\014\n': + self.fp.seek(pos) + return + + +def _test(): + import time + import sys + import os + + args = sys.argv[1:] + if not args: + for key in 'MAILDIR', 'MAIL', 'LOGNAME', 'USER': + if os.environ.has_key(key): + mbox = os.environ[key] + break + else: + print "$MAIL, $LOGNAME nor $USER set -- who are you?" + return + else: + mbox = args[0] + if mbox[:1] == '+': + mbox = os.environ['HOME'] + '/Mail/' + mbox[1:] + elif not '/' in mbox: + mbox = '/usr/mail/' + mbox + if os.path.isdir(mbox): + if os.path.isdir(os.path.join(mbox, 'cur')): + mb = Maildir(mbox) + else: + mb = MHMailbox(mbox) + else: + fp = open(mbox, 'r') + mb = UnixMailbox(fp) + + msgs = [] + while 1: + msg = mb.next() + if msg is None: + break + msgs.append(msg) + if len(args) <= 1: + msg.fp = None + if len(args) > 1: + num = int(args[1]) + print 'Message %d body:'%num + msg = msgs[num-1] + msg.rewindbody() + sys.stdout.write(msg.fp.read()) + else: + print 'Mailbox',mbox,'has',len(msgs),'messages:' + for msg in msgs: + f = msg.getheader('from') or "" + s = msg.getheader('subject') or "" + d = msg.getheader('date') or "" + print '-%20.20s %20.20s %-30.30s'%(f, d[5:], s) + + +if __name__ == '__main__': + _test() |
