diff options
| author | bwarsaw | 2000-04-07 04:46:30 +0000 |
|---|---|---|
| committer | bwarsaw | 2000-04-07 04:46:30 +0000 |
| commit | c61cc5b134139a1dcdf9ffad657be86fb10df7d7 (patch) | |
| tree | 5afcc5afa6f0bbca2dbb52bbab97b6ffd85e594e | |
| parent | d39dadfcbd5d2efa35c7544913f8a377f095e5d5 (diff) | |
| download | mailman-c61cc5b134139a1dcdf9ffad657be86fb10df7d7.tar.gz mailman-c61cc5b134139a1dcdf9ffad657be86fb10df7d7.tar.zst mailman-c61cc5b134139a1dcdf9ffad657be86fb10df7d7.zip | |
Significant reworking based on the comments of Harald Meland and Jim
Tittsler. In summary, each list maintains its own watermark as a
config.db attribute, and the separate gate_watermarks is no longer
used. This greatly simplifies the logic in this script.
Some specifics,
main(): The command line options have changed, and no longer supports
invocation as "gate_news listname first last". Added --stderrs/-s,
--quiet/-q, and --help/-h flags.
Removed unnecessary imports.
WATERMARK_FILE, LIST_LOCK_FILE both are gone. There's a global block
file and each list locks itself when actually doing the gating of
articles.
| -rwxr-xr-x | cron/gate_news | 372 |
1 files changed, 165 insertions, 207 deletions
diff --git a/cron/gate_news b/cron/gate_news index 4a6f720a5..fd034d3a8 100755 --- a/cron/gate_news +++ b/cron/gate_news @@ -18,24 +18,21 @@ """Poll the NNTP servers for messages to be gatewayed to mailing lists. -Usage is either +Usage: gate_news [options] - gate_news +Where options are -or + --stderrs + -s + Print errors to stderr in addition to being logged to logs/fromusenet - gate_news listname first last + --quiet + -q + Run quietly. Nothing is output unless there is an exception. -In the former case, the watermark file is used to find the first message to -post, and all messages up to the last available on the newsgroup are gated. -The watermark file is then updated for the next run. This is the way the -script should be run from cron. - -In the latter case, only the specified mailing list's newsgroup is polled, and -only the message from first to last (inclusive) are gated to the mailing -list. In this case, the watermark file is /not/ updated. - -Either form can be run from the command line. + --help + -h + Print this text and exit. """ @@ -43,16 +40,15 @@ import sys import os import string import time -import marshal -import errno import traceback import socket +import getopt +from errno import EEXIST import paths -from Mailman import MailList from Mailman import mm_cfg +from Mailman import MailList from Mailman import Utils -from Mailman import LockFile from Mailman import Message from Mailman.Logging.Utils import LogStdErr @@ -65,160 +61,19 @@ import signal signal.signal(signal.SIGCHLD, signal.SIG_DFL) -WATERMARK_FILE = os.path.join(mm_cfg.DATA_DIR, 'gate_watermarks') -LIST_LOCK_FILE = os.path.join(mm_cfg.LOCK_DIR, 'gate_lock.') - -LogStdErr('fromusenet', 'gate_news', tee_to_stdout=0) +VERBOSE = 1 +BLOCKFILE = 'gate_news.lck' -def main(): - # check command line options - names = None - updatewatermarks = 1 - if len(sys.argv) == 1: - names = Utils.list_names() - first = -1 - last = -1 - elif len(sys.argv) == 4: - names = [sys.argv[1]] - try: - first = int(sys.argv[2]) - last = int(sys.argv[3]) - except ValueError: - names = None - updatewatermarks = 0 - if not names: - print __doc__ - sys.exit(1) - # try to open the watermarks file - if updatewatermarks: - try: - fp = open(WATERMARK_FILE) - watermarks = marshal.load(fp) - fp.close() - # TBD: marshal or open could raise other exceptions, namely EOFError, - # ValueError or TypeError. Should we zap the watermarks file if that - # happens? - except IOError, (code, msg): - if code <> errno.ENOENT: - raise - watermarks = {} - # We need to reap the child processes, which will tell us the last article - # number actually gated - children = {} - # cruise through lists - for name in names: - # check to see if the list is gating news to mail. If not, skip the - # list. If so, then we have to poll the newsgroup and gate any - # outstanding messages. - mlist = MailList.MailList(name, lock=0) - if not mlist.gateway_to_mail: - continue - # Open up a "mode reader" connection to the gated newsgroup. We want - # to get the watermark for the group in the parent process so that we - # can safely update the gate_watermarks file. We'll actually do the - # gating in a child process, so we do need to open up a new connection - # for each list. - try: - conn = nntplib.NNTP(mlist.nntp_host, readermode=1) - except socket.error, e: - # couldn't open a socket to the NNTP host. maybe we've got too - # many open right now, or the server is unavailable - sys.stderr.write('connect to nntp_host failed\n') - sys.stderr.write(`e`) - break - r,c,f,l,n = conn.group(mlist.linked_newsgroup) - if not updatewatermarks: - # just post the specified messages and be done with it - poll_newsgroup(mlist, conn, first, last+1) - conn.quit() - return - # Otherwise, let's figure out what needs to be done - first = int(f) - last = int(l) - wm = watermarks.get(name, 0) - if wm == 0: - # This is the first time we've tried to gate this newsgroup. We - # essentially do a mass catch-up, otherwise we'd flood the mailing - # list. If you want to post all earlier messages, do this - # manually using the comand line interface. - watermarks[name] = last - continue - # try to get a per-list lock because it makes no sense to have more - # than one process gating a newsgroup. if we can't get the lock, just - # ignore the list for now... 5 minutes (usually how cron invokes this) - # later we'll try again anyway. - lock = LockFile.LockFile(LIST_LOCK_FILE + name, lifetime=240) - try: - lock.lock(timeout=0.5) - except LockFile.TimeOutError: - # TBD: It is possible that some other process has laid claim to - # the gate lock for this list, but that said process has exited - # uncleanly. If that's the case, and it leaves it's lock claim on - # disk, we will never be able to gate from usenet to the list - # again, until the stale lock is removed. For now, we just log - # this potentially deadlocked situation, but this should really be - # fixed (probably in LockFile.py though). - sys.stderr.write('Could not acquire lock for list %s\n' % name) - sys.stderr.write('Some other process is gating this list\n') - # someone else is gating this list already - continue - # Fork a child to do the actual gating. - # - # TBD: There are several issues here, revolving around finding out - # from the child exactly which messages were successfully gated. - # Let's say the child is going to gate messages 125-175. If we were - # really anal, we'd open a pipe and let the child tell us the last - # message it successfully gatewayed. Can't use an exit status here - # because message numbers can easily be > 255. But managing all those - # child pipes means pipes and selects, which is probably overkill. - # - # Instead what we do is just get the exit status of the child. If the - # child completes successfully, we assume it gated all the requested - # messages. If it exits with a non-zero status, we assume it gated - # none of them. This is probably good enough, although some - # duplicates are theoretically possible. - pid = os.fork() - if pid: - # in the parent. record the pid of the child, the child's list - # name, and last message number. when the child successfully - # exits, we'll update it's watermark - children[pid] = (name, last) - else: - # in the child. - # - # Steal the list's gateway lock from the parent because we're - # going to manage it from here on, and we have a different PID - # than our parent. We want to minimize any race conditions where - # someone else can steal the lock from us. I think there's still - # a race condition during the time we've actually got the file - # open for writing and when we're done writing it (during the - # steal()), but that should be very small. - try: - lock.steal() - try: - poll_newsgroup(mlist, conn, max(wm+1, first), last+1) - finally: - conn.quit() - try: - lock.unlock() - except LockFile.NotLockedError: - # I think it's okay to ignore these specific exceptions - pass - os._exit(0) - except: - # if anything else bad happens, log the exception to stderr. - # TBD: we should probably generalize scripts/driver to handle - # this situation - traceback.print_exc() - os._exit(1) - # wait on at least one child - reap(children, watermarks) - # we're done forking off all the gating children, now just wait for them - # all to exit, and then we're done - while children: - reap(children, watermarks) +def open_newsgroup(mlist): + # Open up a "mode reader" connection to the gated newsgroup. Let + # exceptions percolate up. + conn = nntplib.NNTP(mlist.nntp_host, readermode=1) + # Get the GROUP information for the list, but we're only really interested + # in the first article number and the last article number + r,c,f,l,n = conn.group(mlist.linked_newsgroup) + return conn, int(f), int(l) @@ -226,8 +81,7 @@ def main(): QuickEscape = 'QuickEscape' def poll_newsgroup(mlist, conn, first, last): - # NEWNEWS is not portable and has synchronization issues... Use a - # watermark system instead. + # NEWNEWS is not portable and has synchronization issues. for num in range(first, last): try: headers = conn.head(`num`)[3] @@ -243,9 +97,9 @@ def poll_newsgroup(mlist, conn, first, last): raise QuickEscape body = conn.body(`num`)[3] # Usenet originated messages will not have a Unix envelope - # (i.e. "From " header). This breaks Pipermail archiving, so - # we will synthesize one. Be sure to use the format searched - # for by mailbox.UnixMailbox._isrealfromline() + # (i.e. "From " header). This breaks Pipermail archiving, so we + # will synthesize one. Be sure to use the format searched for by + # mailbox.UnixMailbox._isrealfromline() timehdr = time.asctime(time.localtime(time.time())) lines = ['From ' + mlist.GetAdminEmail() + ' ' + timehdr] lines.extend(headers) @@ -257,22 +111,16 @@ def poll_newsgroup(mlist, conn, first, last): if found_to: msg['X-Originally-To'] = msg['To'] msg['To'] = mlist.GetListEmail() - # the list must be locked during posting - lockflag = mlist.Locked() - try: - try: - mlist.Lock() - except Locked.AlreadyLockedError: - pass + # Post the message to the locked list + if VERBOSE: sys.stderr.write('posting msgid %d to list %s\n' % (num, mlist.internal_name())) - mlist.Post(msg) + mlist.Post(msg) + # record the last gated article number + mlist.usenet_watermark = num + if VERBOSE: sys.stderr.write('posted msgid %d to list %s\n' % (num, mlist.internal_name())) - finally: - mlist.Save() - if not lockflag: - mlist.Unlock() except nntplib.error_temp, msg: sys.stderr.write('encountered NNTP error for list %s\n' % mlist.internal_name()) @@ -282,36 +130,146 @@ def poll_newsgroup(mlist, conn, first, last): -def reap(children, watermarks): +def gate_list(mlist): + # Get the list's watermark, i.e. the last article number that this gated + # from news to mail. None means that this list has never polled its + # newsgroup. + watermark = getattr(mlist, 'usenet_watermark', None) + # Open the newsgroup, but let exceptions percolate up + conn, first, last = open_newsgroup(mlist) + try: + if watermark is None: + # This is the first time we've tried to gate this newsgroup. We + # essentially do a mass catch-up, otherwise we'd flood the mailing + # list. + mlist.usenet_watermark = last + else: + # The list has been polled previously, so now we simply grab all + # the messages on the newsgroup that have not been seen by the + # mailing list. The first such article is the maximum of the + # lowest article available on the list and the watermark. It's + # possible that some articles have been expired since the last + # time gate_news has run. Not much we can do about that. + poll_newsgroup(mlist, conn, max(watermark+1, first), last+1) + finally: + conn.quit() + + + +def reap(children): if not children: return - # see if any children have exited yet + # See if any children have exited yet pid, status = os.waitpid(-1, os.WNOHANG) if pid == 0: - # nope, none are ready + # Nope, none are ready return - name, last = children[pid] - del children[pid] - if not status: - # successful exit - watermarks[name] = last - # Save the new watermarks after every newsgroup gating has started, so in - # case of a system crash we reduce the number of multiply gated messages. - # it might be better to save after every post, but that is harder to - # coordinate safely between the subprocesses, and would probably be *much* - # slower. - omask = os.umask(002) try: - fp = open(WATERMARK_FILE + '.tmp', 'w') - marshal.dump(watermarks, fp) - fp.close() - os.rename(WATERMARK_FILE + '.tmp', WATERMARK_FILE) + del children[pid] + except KeyError: + # Huh? how could this happen? + pass + + + +def process_lists(): + # for waitpids + children = {} + for listname in Utils.list_names(): + # Open the list unlocked just to check to see if it is gating news to + # mail. If not, we're done with the list. Otherwise, create a fork + # for gating, and immediately lock the group. + mlist = MailList.MailList(listname, lock=0) + if not mlist.gateway_to_mail: + continue + pid = os.fork() + if pid: + # In the parent. record the pid of the child, the child's list + # name, and last message number. when the child successfully + # exits, we'll update it's watermark + children[pid] = pid + else: + # In the child. + status = 0 + try: + try: + mlist.Lock() + gate_list(mlist) + if VERBOSE: + sys.stderr.write('%s watermark: %d\n' % + (mlist.internal_name(), + mlist.usenet_watermark)) + except: + # if anything else bad happens, log the exception to + # stderr. TBD: we should probably generalize + # scripts/driver to handle this situation + status = 1 + traceback.print_exc() + finally: + mlist.Save() + mlist.Unlock() + os._exit(status) + # we're done forking off all the gating children, now just wait for them + # all to exit, and then we're done + while children: + reap(children) + + + +def main(): + # block any other gate_news process from running + blockfile = os.path.join(mm_cfg.DATA_DIR, BLOCKFILE) + try: + fd = os.open(blockfile, os.O_CREAT | os.O_EXCL) + os.close(fd) + except OSError, e: + if e.errno <> EEXIST: + raise + # some other gate_news process is already running + if VERBOSE: + sys.stderr.write('some other gate_news is already running\n') + return + try: + process_lists() finally: - os.umask(omask) + os.unlink(blockfile) + + + +def usage(status, msg=''): + print __doc__ % globals() + if msg: + print msg + sys.exit(status) if __name__ == '__main__': - sys.stderr.write('begin gating\n') + global VERBOSE + + try: + opts, args = getopt.getopt(sys.argv[1:], 'shq', + ['stderrs', 'quiet', 'help']) + except getopt.error, msg: + usage(1, msg) + + if args: + usage(1, 'No args are expected') + + tee_to_stdout = 0 + VERBOSE = 1 + for opt, arg in opts: + if opt in ('-h', '--help'): + usage(0) + elif opt in ('-s', '--stderrs'): + tee_to_stdout = 1 + elif opt in ('-q', '--quiet'): + VERBOSE = 0 + + # Set up stderr + LogStdErr('fromusenet', 'gate_news', tee_to_stdout=tee_to_stdout) + if VERBOSE: + sys.stderr.write('begin gating\n') main() - sys.stderr.write('end gating\n') + if VERBOSE: + sys.stderr.write('end gating\n') |
