diff options
| author | bwarsaw | 2000-05-08 22:09:23 +0000 |
|---|---|---|
| committer | bwarsaw | 2000-05-08 22:09:23 +0000 |
| commit | 8c385d0b530f1f08b069291b8d3161b39b378688 (patch) | |
| tree | fc4390601058eb017ece7d451411e75ccc228bd3 | |
| parent | 33d7497da9d97aad49a5fc371859dc84e9a796c2 (diff) | |
| download | mailman-8c385d0b530f1f08b069291b8d3161b39b378688.tar.gz mailman-8c385d0b530f1f08b069291b8d3161b39b378688.tar.zst mailman-8c385d0b530f1f08b069291b8d3161b39b378688.zip | |
Substantial changes for robustification of deliveries.
dispose_message(): Compatibility function for delivery of pre-2.0beta3
version qfiles and 2.0b3 version qfiles (whose marshaled state now
contains a schema version number).
open_list(): Since qrunner can attempt delivery against the same list
multiple times, cache the list name and the MailList object in a
global, and reuse it if possible. The only thing that has to be
managed explicitly is acquiring and relinquishing the lock. That fine
though 'cause that's what we actually want to do!
main(): Integrate with the new delivery stuff; use the msgdata
dictionary for message metadata. Also, dig out the 'kids' info, which
is a list of subproc pids to wait on. Re-queue the message if
delivery did not successfully complete.
| -rw-r--r-- | cron/qrunner | 111 |
1 files changed, 80 insertions, 31 deletions
diff --git a/cron/qrunner b/cron/qrunner index 732fc7054..f9564dbb0 100644 --- a/cron/qrunner +++ b/cron/qrunner @@ -44,24 +44,36 @@ LogStdErr('error', 'qrunner', tee_to_stdout=0) +def dispose_message(mlist, msg, msgdata): + # Pre 2.0beta3 qfiles have no schema version number + version = msgdata.get('version', 0) + if version < 1: + return HandlerAPI.RedeliverMessage(mlist, msg) + return HandlerAPI.DeliverToList(mlist, msg, msgdata) + + + +_listcache = {} +def open_list(listname): + global _listcache + mlist = _listcache.get(listname) + if not mlist: + try: + mlist = MailList.MailList(listname, lock=0) + _listcache[listname] = mlist + except Errors.MMListError, e: + sys.stderr.write('qrunner error opening list: %s\n%s' % + (listname, e)) + return mlist + + + +_kids = {} def main(): - # first, claim the queue runner lock - lock = LockFile.LockFile(QRUNNER_LOCK_FILE, lifetime=QRUNNER_LOCK_LIFETIME) - try: - lock.lock(timeout=0.5) - except LockFile.TimeOutError: - # TBD: It is possible that some other process has laid claim to the - # gate lock for this list, but that said process has exited uncleanly. - # If that's the case, and it leaves it's lock claim on disk, we will - # never be able to gate from usenet to the list again, until the stale - # lock is removed. For now, we just log this potentially deadlocked - # situation, but this should really be fixed (probably in LockFile.py - # though). - sys.stderr.write('Could not acquire qrunner lock') - return + global _kids for file in os.listdir(mm_cfg.QUEUE_DIR): root, ext = os.path.splitext(os.path.join(mm_cfg.QUEUE_DIR, file)) - if ext == '.db': + if ext <> '.msg': # trigger just off the .msg file continue try: @@ -75,32 +87,69 @@ def main(): # For some reason we had trouble getting all the information out # of the queued files. log this and move on (we figure it's a # temporary problem) - sys.stderr.write('Exception reading qfiles: %s' % e) + sys.stderr.write('Exception reading qfile: %s\n%s' % (root, e)) + continue + # Dispose of it, after ensuring that we've got the lock on the list. + listname = msgdata.get('listname') + if not listname: + sys.stderr.write('qfile metadata specifies no list: %s' % root) + continue + mlist = open_list(listname) + if not mlist: continue - listname = msgdata['listname'] - recips = msgdata['recips'] - msg = Message.OutgoingMessage(msgtext) + # Now try to get the list lock try: - mlist = MailList.MailList(listname) - except Errors.MMListError, e: - sys.stderr.write('Problem opening mailing list %s: %s' % - (listname, e)) + mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT) + except LockFile.TimeOutError: + # oh well, try again later continue try: - msg.recips = recips - HandlerAPI.RedeliverMessage(mlist, msg) - # check to see if this message needs to be requeued - if not getattr(msg, 'failedcount', 0): - # we're done with this message + msg = Message.OutgoingMessage(msgtext) + keepqueued = dispose_message(mlist, msg, msgdata) + # Did the delivery generate child processes? Don't store them in + # the message data files. + kids = msgdata.get('kids') + if kids: + _kids.update(kids) + del msgdata['kids'] + if keepqueued: + msg.Enqueue(mlist, msgdata) + if not keepqueued: + # We're done with this message os.unlink(root + '.db') os.unlink(root + '.msg') finally: mlist.Save() mlist.Unlock() - # we're done, release the lock - lock.unlock() if __name__ == '__main__': - main() + global _listcache + global _kids + # first, claim the queue runner lock + lock = LockFile.LockFile(QRUNNER_LOCK_FILE, lifetime=QRUNNER_LOCK_LIFETIME) + try: + lock.lock(timeout=0.5) + except LockFile.TimeOutError: + # TBD: It is possible that some other process has laid claim to the + # gate lock for this list, but that said process has exited uncleanly. + # If that's the case, and it leaves it's lock claim on disk, we will + # never be able to gate from usenet to the list again, until the stale + # lock is removed. For now, we just log this potentially deadlocked + # situation, but this should really be fixed (probably in LockFile.py + # though). + sys.stderr.write('Could not acquire qrunner lock') + else: + try: + main() + finally: + lock.unlock(unconditionally=1) + # Clear the global cache to be clean about it. Also, we can reap + # any child processes that were created during the delivery + # (e.g. from ToUsenet). + _listcache.clear() + while _kids: + pid, status = os.waitpid(-1, os.WNOHANG) + if pid <> 0: + del _kids[pid] |
