summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbwarsaw2000-04-07 04:46:30 +0000
committerbwarsaw2000-04-07 04:46:30 +0000
commitc61cc5b134139a1dcdf9ffad657be86fb10df7d7 (patch)
tree5afcc5afa6f0bbca2dbb52bbab97b6ffd85e594e
parentd39dadfcbd5d2efa35c7544913f8a377f095e5d5 (diff)
downloadmailman-c61cc5b134139a1dcdf9ffad657be86fb10df7d7.tar.gz
mailman-c61cc5b134139a1dcdf9ffad657be86fb10df7d7.tar.zst
mailman-c61cc5b134139a1dcdf9ffad657be86fb10df7d7.zip
Significant reworking based on the comments of Harald Meland and Jim
Tittsler. In summary, each list maintains its own watermark as a config.db attribute, and the separate gate_watermarks is no longer used. This greatly simplifies the logic in this script. Some specifics, main(): The command line options have changed, and no longer supports invocation as "gate_news listname first last". Added --stderrs/-s, --quiet/-q, and --help/-h flags. Removed unnecessary imports. WATERMARK_FILE, LIST_LOCK_FILE both are gone. There's a global block file and each list locks itself when actually doing the gating of articles.
-rwxr-xr-xcron/gate_news372
1 files changed, 165 insertions, 207 deletions
diff --git a/cron/gate_news b/cron/gate_news
index 4a6f720a5..fd034d3a8 100755
--- a/cron/gate_news
+++ b/cron/gate_news
@@ -18,24 +18,21 @@
"""Poll the NNTP servers for messages to be gatewayed to mailing lists.
-Usage is either
+Usage: gate_news [options]
- gate_news
+Where options are
-or
+ --stderrs
+ -s
+ Print errors to stderr in addition to being logged to logs/fromusenet
- gate_news listname first last
+ --quiet
+ -q
+ Run quietly. Nothing is output unless there is an exception.
-In the former case, the watermark file is used to find the first message to
-post, and all messages up to the last available on the newsgroup are gated.
-The watermark file is then updated for the next run. This is the way the
-script should be run from cron.
-
-In the latter case, only the specified mailing list's newsgroup is polled, and
-only the message from first to last (inclusive) are gated to the mailing
-list. In this case, the watermark file is /not/ updated.
-
-Either form can be run from the command line.
+ --help
+ -h
+ Print this text and exit.
"""
@@ -43,16 +40,15 @@ import sys
import os
import string
import time
-import marshal
-import errno
import traceback
import socket
+import getopt
+from errno import EEXIST
import paths
-from Mailman import MailList
from Mailman import mm_cfg
+from Mailman import MailList
from Mailman import Utils
-from Mailman import LockFile
from Mailman import Message
from Mailman.Logging.Utils import LogStdErr
@@ -65,160 +61,19 @@ import signal
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
-WATERMARK_FILE = os.path.join(mm_cfg.DATA_DIR, 'gate_watermarks')
-LIST_LOCK_FILE = os.path.join(mm_cfg.LOCK_DIR, 'gate_lock.')
-
-LogStdErr('fromusenet', 'gate_news', tee_to_stdout=0)
+VERBOSE = 1
+BLOCKFILE = 'gate_news.lck'
-def main():
- # check command line options
- names = None
- updatewatermarks = 1
- if len(sys.argv) == 1:
- names = Utils.list_names()
- first = -1
- last = -1
- elif len(sys.argv) == 4:
- names = [sys.argv[1]]
- try:
- first = int(sys.argv[2])
- last = int(sys.argv[3])
- except ValueError:
- names = None
- updatewatermarks = 0
- if not names:
- print __doc__
- sys.exit(1)
- # try to open the watermarks file
- if updatewatermarks:
- try:
- fp = open(WATERMARK_FILE)
- watermarks = marshal.load(fp)
- fp.close()
- # TBD: marshal or open could raise other exceptions, namely EOFError,
- # ValueError or TypeError. Should we zap the watermarks file if that
- # happens?
- except IOError, (code, msg):
- if code <> errno.ENOENT:
- raise
- watermarks = {}
- # We need to reap the child processes, which will tell us the last article
- # number actually gated
- children = {}
- # cruise through lists
- for name in names:
- # check to see if the list is gating news to mail. If not, skip the
- # list. If so, then we have to poll the newsgroup and gate any
- # outstanding messages.
- mlist = MailList.MailList(name, lock=0)
- if not mlist.gateway_to_mail:
- continue
- # Open up a "mode reader" connection to the gated newsgroup. We want
- # to get the watermark for the group in the parent process so that we
- # can safely update the gate_watermarks file. We'll actually do the
- # gating in a child process, so we do need to open up a new connection
- # for each list.
- try:
- conn = nntplib.NNTP(mlist.nntp_host, readermode=1)
- except socket.error, e:
- # couldn't open a socket to the NNTP host. maybe we've got too
- # many open right now, or the server is unavailable
- sys.stderr.write('connect to nntp_host failed\n')
- sys.stderr.write(`e`)
- break
- r,c,f,l,n = conn.group(mlist.linked_newsgroup)
- if not updatewatermarks:
- # just post the specified messages and be done with it
- poll_newsgroup(mlist, conn, first, last+1)
- conn.quit()
- return
- # Otherwise, let's figure out what needs to be done
- first = int(f)
- last = int(l)
- wm = watermarks.get(name, 0)
- if wm == 0:
- # This is the first time we've tried to gate this newsgroup. We
- # essentially do a mass catch-up, otherwise we'd flood the mailing
- # list. If you want to post all earlier messages, do this
- # manually using the comand line interface.
- watermarks[name] = last
- continue
- # try to get a per-list lock because it makes no sense to have more
- # than one process gating a newsgroup. if we can't get the lock, just
- # ignore the list for now... 5 minutes (usually how cron invokes this)
- # later we'll try again anyway.
- lock = LockFile.LockFile(LIST_LOCK_FILE + name, lifetime=240)
- try:
- lock.lock(timeout=0.5)
- except LockFile.TimeOutError:
- # TBD: It is possible that some other process has laid claim to
- # the gate lock for this list, but that said process has exited
- # uncleanly. If that's the case, and it leaves it's lock claim on
- # disk, we will never be able to gate from usenet to the list
- # again, until the stale lock is removed. For now, we just log
- # this potentially deadlocked situation, but this should really be
- # fixed (probably in LockFile.py though).
- sys.stderr.write('Could not acquire lock for list %s\n' % name)
- sys.stderr.write('Some other process is gating this list\n')
- # someone else is gating this list already
- continue
- # Fork a child to do the actual gating.
- #
- # TBD: There are several issues here, revolving around finding out
- # from the child exactly which messages were successfully gated.
- # Let's say the child is going to gate messages 125-175. If we were
- # really anal, we'd open a pipe and let the child tell us the last
- # message it successfully gatewayed. Can't use an exit status here
- # because message numbers can easily be > 255. But managing all those
- # child pipes means pipes and selects, which is probably overkill.
- #
- # Instead what we do is just get the exit status of the child. If the
- # child completes successfully, we assume it gated all the requested
- # messages. If it exits with a non-zero status, we assume it gated
- # none of them. This is probably good enough, although some
- # duplicates are theoretically possible.
- pid = os.fork()
- if pid:
- # in the parent. record the pid of the child, the child's list
- # name, and last message number. when the child successfully
- # exits, we'll update it's watermark
- children[pid] = (name, last)
- else:
- # in the child.
- #
- # Steal the list's gateway lock from the parent because we're
- # going to manage it from here on, and we have a different PID
- # than our parent. We want to minimize any race conditions where
- # someone else can steal the lock from us. I think there's still
- # a race condition during the time we've actually got the file
- # open for writing and when we're done writing it (during the
- # steal()), but that should be very small.
- try:
- lock.steal()
- try:
- poll_newsgroup(mlist, conn, max(wm+1, first), last+1)
- finally:
- conn.quit()
- try:
- lock.unlock()
- except LockFile.NotLockedError:
- # I think it's okay to ignore these specific exceptions
- pass
- os._exit(0)
- except:
- # if anything else bad happens, log the exception to stderr.
- # TBD: we should probably generalize scripts/driver to handle
- # this situation
- traceback.print_exc()
- os._exit(1)
- # wait on at least one child
- reap(children, watermarks)
- # we're done forking off all the gating children, now just wait for them
- # all to exit, and then we're done
- while children:
- reap(children, watermarks)
+def open_newsgroup(mlist):
+ # Open up a "mode reader" connection to the gated newsgroup. Let
+ # exceptions percolate up.
+ conn = nntplib.NNTP(mlist.nntp_host, readermode=1)
+ # Get the GROUP information for the list, but we're only really interested
+ # in the first article number and the last article number
+ r,c,f,l,n = conn.group(mlist.linked_newsgroup)
+ return conn, int(f), int(l)
@@ -226,8 +81,7 @@ def main():
QuickEscape = 'QuickEscape'
def poll_newsgroup(mlist, conn, first, last):
- # NEWNEWS is not portable and has synchronization issues... Use a
- # watermark system instead.
+ # NEWNEWS is not portable and has synchronization issues.
for num in range(first, last):
try:
headers = conn.head(`num`)[3]
@@ -243,9 +97,9 @@ def poll_newsgroup(mlist, conn, first, last):
raise QuickEscape
body = conn.body(`num`)[3]
# Usenet originated messages will not have a Unix envelope
- # (i.e. "From " header). This breaks Pipermail archiving, so
- # we will synthesize one. Be sure to use the format searched
- # for by mailbox.UnixMailbox._isrealfromline()
+ # (i.e. "From " header). This breaks Pipermail archiving, so we
+ # will synthesize one. Be sure to use the format searched for by
+ # mailbox.UnixMailbox._isrealfromline()
timehdr = time.asctime(time.localtime(time.time()))
lines = ['From ' + mlist.GetAdminEmail() + ' ' + timehdr]
lines.extend(headers)
@@ -257,22 +111,16 @@ def poll_newsgroup(mlist, conn, first, last):
if found_to:
msg['X-Originally-To'] = msg['To']
msg['To'] = mlist.GetListEmail()
- # the list must be locked during posting
- lockflag = mlist.Locked()
- try:
- try:
- mlist.Lock()
- except Locked.AlreadyLockedError:
- pass
+ # Post the message to the locked list
+ if VERBOSE:
sys.stderr.write('posting msgid %d to list %s\n' %
(num, mlist.internal_name()))
- mlist.Post(msg)
+ mlist.Post(msg)
+ # record the last gated article number
+ mlist.usenet_watermark = num
+ if VERBOSE:
sys.stderr.write('posted msgid %d to list %s\n' %
(num, mlist.internal_name()))
- finally:
- mlist.Save()
- if not lockflag:
- mlist.Unlock()
except nntplib.error_temp, msg:
sys.stderr.write('encountered NNTP error for list %s\n' %
mlist.internal_name())
@@ -282,36 +130,146 @@ def poll_newsgroup(mlist, conn, first, last):
-def reap(children, watermarks):
+def gate_list(mlist):
+ # Get the list's watermark, i.e. the last article number that this gated
+ # from news to mail. None means that this list has never polled its
+ # newsgroup.
+ watermark = getattr(mlist, 'usenet_watermark', None)
+ # Open the newsgroup, but let exceptions percolate up
+ conn, first, last = open_newsgroup(mlist)
+ try:
+ if watermark is None:
+ # This is the first time we've tried to gate this newsgroup. We
+ # essentially do a mass catch-up, otherwise we'd flood the mailing
+ # list.
+ mlist.usenet_watermark = last
+ else:
+ # The list has been polled previously, so now we simply grab all
+ # the messages on the newsgroup that have not been seen by the
+ # mailing list. The first such article is the maximum of the
+ # lowest article available on the list and the watermark. It's
+ # possible that some articles have been expired since the last
+ # time gate_news has run. Not much we can do about that.
+ poll_newsgroup(mlist, conn, max(watermark+1, first), last+1)
+ finally:
+ conn.quit()
+
+
+
+def reap(children):
if not children:
return
- # see if any children have exited yet
+ # See if any children have exited yet
pid, status = os.waitpid(-1, os.WNOHANG)
if pid == 0:
- # nope, none are ready
+ # Nope, none are ready
return
- name, last = children[pid]
- del children[pid]
- if not status:
- # successful exit
- watermarks[name] = last
- # Save the new watermarks after every newsgroup gating has started, so in
- # case of a system crash we reduce the number of multiply gated messages.
- # it might be better to save after every post, but that is harder to
- # coordinate safely between the subprocesses, and would probably be *much*
- # slower.
- omask = os.umask(002)
try:
- fp = open(WATERMARK_FILE + '.tmp', 'w')
- marshal.dump(watermarks, fp)
- fp.close()
- os.rename(WATERMARK_FILE + '.tmp', WATERMARK_FILE)
+ del children[pid]
+ except KeyError:
+ # Huh? how could this happen?
+ pass
+
+
+
+def process_lists():
+ # for waitpids
+ children = {}
+ for listname in Utils.list_names():
+ # Open the list unlocked just to check to see if it is gating news to
+ # mail. If not, we're done with the list. Otherwise, create a fork
+ # for gating, and immediately lock the group.
+ mlist = MailList.MailList(listname, lock=0)
+ if not mlist.gateway_to_mail:
+ continue
+ pid = os.fork()
+ if pid:
+ # In the parent. record the pid of the child, the child's list
+ # name, and last message number. when the child successfully
+ # exits, we'll update it's watermark
+ children[pid] = pid
+ else:
+ # In the child.
+ status = 0
+ try:
+ try:
+ mlist.Lock()
+ gate_list(mlist)
+ if VERBOSE:
+ sys.stderr.write('%s watermark: %d\n' %
+ (mlist.internal_name(),
+ mlist.usenet_watermark))
+ except:
+ # if anything else bad happens, log the exception to
+ # stderr. TBD: we should probably generalize
+ # scripts/driver to handle this situation
+ status = 1
+ traceback.print_exc()
+ finally:
+ mlist.Save()
+ mlist.Unlock()
+ os._exit(status)
+ # we're done forking off all the gating children, now just wait for them
+ # all to exit, and then we're done
+ while children:
+ reap(children)
+
+
+
+def main():
+ # block any other gate_news process from running
+ blockfile = os.path.join(mm_cfg.DATA_DIR, BLOCKFILE)
+ try:
+ fd = os.open(blockfile, os.O_CREAT | os.O_EXCL)
+ os.close(fd)
+ except OSError, e:
+ if e.errno <> EEXIST:
+ raise
+ # some other gate_news process is already running
+ if VERBOSE:
+ sys.stderr.write('some other gate_news is already running\n')
+ return
+ try:
+ process_lists()
finally:
- os.umask(omask)
+ os.unlink(blockfile)
+
+
+
+def usage(status, msg=''):
+ print __doc__ % globals()
+ if msg:
+ print msg
+ sys.exit(status)
if __name__ == '__main__':
- sys.stderr.write('begin gating\n')
+ global VERBOSE
+
+ try:
+ opts, args = getopt.getopt(sys.argv[1:], 'shq',
+ ['stderrs', 'quiet', 'help'])
+ except getopt.error, msg:
+ usage(1, msg)
+
+ if args:
+ usage(1, 'No args are expected')
+
+ tee_to_stdout = 0
+ VERBOSE = 1
+ for opt, arg in opts:
+ if opt in ('-h', '--help'):
+ usage(0)
+ elif opt in ('-s', '--stderrs'):
+ tee_to_stdout = 1
+ elif opt in ('-q', '--quiet'):
+ VERBOSE = 0
+
+ # Set up stderr
+ LogStdErr('fromusenet', 'gate_news', tee_to_stdout=tee_to_stdout)
+ if VERBOSE:
+ sys.stderr.write('begin gating\n')
main()
- sys.stderr.write('end gating\n')
+ if VERBOSE:
+ sys.stderr.write('end gating\n')