summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbwarsaw2000-05-08 22:09:23 +0000
committerbwarsaw2000-05-08 22:09:23 +0000
commit8c385d0b530f1f08b069291b8d3161b39b378688 (patch)
treefc4390601058eb017ece7d451411e75ccc228bd3
parent33d7497da9d97aad49a5fc371859dc84e9a796c2 (diff)
downloadmailman-8c385d0b530f1f08b069291b8d3161b39b378688.tar.gz
mailman-8c385d0b530f1f08b069291b8d3161b39b378688.tar.zst
mailman-8c385d0b530f1f08b069291b8d3161b39b378688.zip
Substantial changes for robustification of deliveries.
dispose_message(): Compatibility function for delivery of pre-2.0beta3 version qfiles and 2.0b3 version qfiles (whose marshaled state now contains a schema version number). open_list(): Since qrunner can attempt delivery against the same list multiple times, cache the list name and the MailList object in a global, and reuse it if possible. The only thing that has to be managed explicitly is acquiring and relinquishing the lock. That fine though 'cause that's what we actually want to do! main(): Integrate with the new delivery stuff; use the msgdata dictionary for message metadata. Also, dig out the 'kids' info, which is a list of subproc pids to wait on. Re-queue the message if delivery did not successfully complete.
-rw-r--r--cron/qrunner111
1 files changed, 80 insertions, 31 deletions
diff --git a/cron/qrunner b/cron/qrunner
index 732fc7054..f9564dbb0 100644
--- a/cron/qrunner
+++ b/cron/qrunner
@@ -44,24 +44,36 @@ LogStdErr('error', 'qrunner', tee_to_stdout=0)
+def dispose_message(mlist, msg, msgdata):
+ # Pre 2.0beta3 qfiles have no schema version number
+ version = msgdata.get('version', 0)
+ if version < 1:
+ return HandlerAPI.RedeliverMessage(mlist, msg)
+ return HandlerAPI.DeliverToList(mlist, msg, msgdata)
+
+
+
+_listcache = {}
+def open_list(listname):
+ global _listcache
+ mlist = _listcache.get(listname)
+ if not mlist:
+ try:
+ mlist = MailList.MailList(listname, lock=0)
+ _listcache[listname] = mlist
+ except Errors.MMListError, e:
+ sys.stderr.write('qrunner error opening list: %s\n%s' %
+ (listname, e))
+ return mlist
+
+
+
+_kids = {}
def main():
- # first, claim the queue runner lock
- lock = LockFile.LockFile(QRUNNER_LOCK_FILE, lifetime=QRUNNER_LOCK_LIFETIME)
- try:
- lock.lock(timeout=0.5)
- except LockFile.TimeOutError:
- # TBD: It is possible that some other process has laid claim to the
- # gate lock for this list, but that said process has exited uncleanly.
- # If that's the case, and it leaves it's lock claim on disk, we will
- # never be able to gate from usenet to the list again, until the stale
- # lock is removed. For now, we just log this potentially deadlocked
- # situation, but this should really be fixed (probably in LockFile.py
- # though).
- sys.stderr.write('Could not acquire qrunner lock')
- return
+ global _kids
for file in os.listdir(mm_cfg.QUEUE_DIR):
root, ext = os.path.splitext(os.path.join(mm_cfg.QUEUE_DIR, file))
- if ext == '.db':
+ if ext <> '.msg':
# trigger just off the .msg file
continue
try:
@@ -75,32 +87,69 @@ def main():
# For some reason we had trouble getting all the information out
# of the queued files. log this and move on (we figure it's a
# temporary problem)
- sys.stderr.write('Exception reading qfiles: %s' % e)
+ sys.stderr.write('Exception reading qfile: %s\n%s' % (root, e))
+ continue
+ # Dispose of it, after ensuring that we've got the lock on the list.
+ listname = msgdata.get('listname')
+ if not listname:
+ sys.stderr.write('qfile metadata specifies no list: %s' % root)
+ continue
+ mlist = open_list(listname)
+ if not mlist:
continue
- listname = msgdata['listname']
- recips = msgdata['recips']
- msg = Message.OutgoingMessage(msgtext)
+ # Now try to get the list lock
try:
- mlist = MailList.MailList(listname)
- except Errors.MMListError, e:
- sys.stderr.write('Problem opening mailing list %s: %s' %
- (listname, e))
+ mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
+ except LockFile.TimeOutError:
+ # oh well, try again later
continue
try:
- msg.recips = recips
- HandlerAPI.RedeliverMessage(mlist, msg)
- # check to see if this message needs to be requeued
- if not getattr(msg, 'failedcount', 0):
- # we're done with this message
+ msg = Message.OutgoingMessage(msgtext)
+ keepqueued = dispose_message(mlist, msg, msgdata)
+ # Did the delivery generate child processes? Don't store them in
+ # the message data files.
+ kids = msgdata.get('kids')
+ if kids:
+ _kids.update(kids)
+ del msgdata['kids']
+ if keepqueued:
+ msg.Enqueue(mlist, msgdata)
+ if not keepqueued:
+ # We're done with this message
os.unlink(root + '.db')
os.unlink(root + '.msg')
finally:
mlist.Save()
mlist.Unlock()
- # we're done, release the lock
- lock.unlock()
if __name__ == '__main__':
- main()
+ global _listcache
+ global _kids
+ # first, claim the queue runner lock
+ lock = LockFile.LockFile(QRUNNER_LOCK_FILE, lifetime=QRUNNER_LOCK_LIFETIME)
+ try:
+ lock.lock(timeout=0.5)
+ except LockFile.TimeOutError:
+ # TBD: It is possible that some other process has laid claim to the
+ # gate lock for this list, but that said process has exited uncleanly.
+ # If that's the case, and it leaves it's lock claim on disk, we will
+ # never be able to gate from usenet to the list again, until the stale
+ # lock is removed. For now, we just log this potentially deadlocked
+ # situation, but this should really be fixed (probably in LockFile.py
+ # though).
+ sys.stderr.write('Could not acquire qrunner lock')
+ else:
+ try:
+ main()
+ finally:
+ lock.unlock(unconditionally=1)
+ # Clear the global cache to be clean about it. Also, we can reap
+ # any child processes that were created during the delivery
+ # (e.g. from ToUsenet).
+ _listcache.clear()
+ while _kids:
+ pid, status = os.waitpid(-1, os.WNOHANG)
+ if pid <> 0:
+ del _kids[pid]