summaryrefslogtreecommitdiff
path: root/mailman/bin/master.py
diff options
context:
space:
mode:
Diffstat (limited to 'mailman/bin/master.py')
-rw-r--r--mailman/bin/master.py44
1 files changed, 32 insertions, 12 deletions
diff --git a/mailman/bin/master.py b/mailman/bin/master.py
index 39f119725..129df0009 100644
--- a/mailman/bin/master.py
+++ b/mailman/bin/master.py
@@ -269,7 +269,9 @@ class Loop:
:param spec: A queue runner spec, in a format acceptable to
bin/qrunner's --runner argument, e.g. name:slice:count
+ :type spec: string
:return: The process id of the child queue runner.
+ :rtype: int
"""
pid = os.fork()
if pid:
@@ -292,25 +294,43 @@ class Loop:
# We should never get here.
raise RuntimeError('os.execl() failed')
- def start_qrunners(self, qrunners=None):
+ def start_qrunners(self, qrunner_names=None):
"""Start all the configured qrunners.
:param qrunners: If given, a sequence of queue runner names to start.
If not given, this sequence is taken from the configuration file.
+ :type qrunners: a sequence of strings
"""
- if not qrunners:
- spec_parts = config.qrunners.items()
- else:
- spec_parts = []
- for qrname in qrunners:
- if '.' in qrname:
- spec_parts.append((qrname, 1))
- else:
- spec_parts.append((config.qrunner_shortcuts[qrname], 1))
- for qrname, count in spec_parts:
+ if not qrunner_names:
+ qrunner_names = []
+ for qrunner_config in config.qrunner_configs:
+ # Strip off the 'qrunner.' prefix.
+ assert qrunner_config.name.startswith('qrunner.'), (
+ 'Unexpected qrunner configuration section name: %s',
+ qrunner_config.name)
+ qrunner_names.append(qrunner_config.name[8:])
+ # For each qrunner we want to start, find their config section, which
+ # will tell us the name of the class to instantiate, along with the
+ # number of hash space slices to manage.
+ for name in qrunner_names:
+ section_name = 'qrunner.' + name
+ # Let AttributeError propagate.
+ qrunner_config = getattr(config, section_name)
+ if not qrunner_config.start:
+ continue
+ class_path = qrunner_config['class'].split(DOT)
+ package = DOT.join(class_path[:-1])
+ __import__(package)
+ # Let AttributeError propagate.
+ class_ = getattr(sys.modules[package], class_path[-1])
+ # Find out how many qrunners to instantiate. This must be a power
+ # of 2.
+ count = int(qrunner_config.instances)
+ assert (count & (count - 1)) == 0, (
+ 'Queue runner "%s", not a power of 2: %s', name, count)
for slice_number in range(count):
# qrunner name, slice #, # of slices, restart count
- info = (qrname, slice_number, count, 0)
+ info = (name, slice_number, count, 0)
spec = '%s:%d:%d' % (qrname, slice_number, count)
pid = self._start_runner(spec)
log = logging.getLogger('mailman.qrunner')