#!/usr/bin/env python
"""
MPI support classes.
"""
import inspect
import os
import re
import subprocess
import sys
from mpi4py import MPI
from mpi_proc import getargnames, Process, ProcessManager
from mixins import LoggerMixin
from tools.logging import setup_logger, set_excepthook
from tools.misc import memoized_property
[docs]class Worker(Process):
"""
MPI worker class.
This class repeatedly executes a work method.
Parameters
----------
ctrl_tag : int
MPI tag to identify control messages transmitted to worker nodes.
"""
[docs] def __init__(self, ctrl_tag=1, *args, **kwargs):
super(Worker, self).__init__(*args, **kwargs)
# Tag used to distinguish control messages:
self._ctrl_tag = ctrl_tag
# Execution step counter:
self.steps = 0
# Define properties to perform validation when the maximum number of
# execution steps set:
_max_steps = float('inf')
@property
def max_steps(self):
"""
Maximum number of steps to execute.
"""
return self._max_steps
@max_steps.setter
def max_steps(self, value):
if value < 0:
raise ValueError('invalid maximum number of steps')
self.log_info('maximum number of steps changed: %s -> %s' % \
(self._max_steps, value))
self._max_steps = value
def do_work(self):
"""
Work method.
This method is repeatedly executed by the Worker instance after the
instance receives a 'start' control message and until it receives a 'stop'
control message. It should be overridden by child classes.
"""
self.log_info('executing do_work')
def pre_run(self):
"""
Code to run before main loop.
This method is invoked by the `run()` method before the main loop is
started.
"""
self.log_info('running code before body of worker %s' % self.rank)
def post_run(self):
"""
Code to run after main loop.
This method is invoked by the `run()` method after the main loop is
started.
"""
self.log_info('running code after body of worker %s' % self.rank)
# Send acknowledgment message:
self.intercomm.isend(['done', self.rank], 0, self._ctrl_tag)
self.log_info('done message sent to manager')
def run(self):
"""
Main body of worker process.
"""
self.pre_run()
self.log_info('running body of worker %s' % self.rank)
# Start listening for control messages from parent process:
r_ctrl = []
try:
d = self.intercomm.irecv(source=0, tag=self._ctrl_tag)
except TypeError:
# irecv() in mpi4py 1.3.1 stable uses 'dest' instead of 'source':
d = self.intercomm.irecv(dest=0, tag=self._ctrl_tag)
r_ctrl.append(d)
running = False
req = MPI.Request()
self.steps = 0
while True:
# Handle control messages (this assumes that only one control
# message will arrive at a time):
flag, msg_list = req.testall(r_ctrl)
if flag:
msg = msg_list[0]
# Start executing work method:
if msg[0] == 'start':
self.log_info('starting')
running = True
# Stop executing work method::
elif msg[0] == 'stop':
if self.max_steps == float('inf'):
self.log_info('stopping')
running = False
else:
self.log_info('max steps set - not stopping')
# Set maximum number of execution steps:
elif msg[0] == 'steps':
if msg[1] == 'inf':
self.max_steps = float('inf')
else:
self.max_steps = int(msg[1])
self.log_info('setting maximum steps to %s' % self.max_steps)
# Quit:
elif msg[0] == 'quit':
if self.max_steps == float('inf'):
self.log_info('quitting')
break
else:
self.log_info('max steps set - not quitting')
# Get next message:
r_ctrl = []
try:
d = self.intercomm.irecv(source=0, tag=self._ctrl_tag)
except TypeError:
# irecv() in mpi4py 1.3.1 stable uses 'dest' instead of 'source':
d = self.intercomm.irecv(dest=0, tag=self._ctrl_tag)
r_ctrl.append(d)
# Execute work method; the work method may send data back to the master
# as a serialized control message containing two elements, e.g.,
# self.intercomm.isend(['foo', str(self.rank)],
# dest=0, tag=self._ctrl_tag)
if running:
self.do_work()
self.steps += 1
self.log_info('execution step: %s' % self.steps)
# Leave loop if maximum number of steps has been reached:
if self.steps >= self.max_steps:
self.log_info('maximum steps reached')
break
self.post_run()
[docs]class WorkerManager(ProcessManager):
"""
Self-launching MPI worker manager.
This class may be used to construct an MPI application consisting of
- a manager process that spawns MPI processes that execute the run() methods
of several subclasses of the Worker class;
- worker processes that perform some processing task; and
The application should NOT be started via mpiexec.
Parameters
----------
ctrl_tag : int
MPI tag to identify control messages transmitted to worker nodes.
May not be equal to mpi4py.MPI.ANY_TAG
Notes
-----
This class requires MPI-2 dynamic processing management.
See Also
--------
Worker
"""
[docs] def __init__(self, ctrl_tag=1):
super(WorkerManager, self).__init__()
# Validate control tag.
assert ctrl_tag != MPI.ANY_TAG
# Tag used to distinguish MPI control messages:
self._ctrl_tag = ctrl_tag
def add(self, target, *args, **kwargs):
"""
Add a worker to an MPI application.
Parameters
----------
target : Worker
Worker class to instantiate and run.
args : sequence
Sequential arguments to pass to target class constructor.
kwargs : dict
Named arguments to pass to target class constructor.
"""
assert issubclass(target, Worker)
self.log_info('adding class %s' % target.__name__)
return ProcessManager.add(self, target, *args, **kwargs)
def process_worker_msg(self, msg):
"""
Process the specified deserialized message from a worker.
"""
self.log_info('got ctrl msg: %s' % str(msg))
def wait(self):
"""
Wait for execution to complete.
"""
# Start listening for control messages:
r_ctrl = []
try:
d = self.intercomm.irecv(source=MPI.ANY_SOURCE,
tag=self._ctrl_tag)
except TypeError:
# irecv() in mpi4py 1.3.1 stable uses 'dest' instead of 'source':
d = self.intercomm.irecv(dest=MPI.ANY_SOURCE,
tag=self._ctrl_tag)
r_ctrl.append(d)
workers = range(len(self))
req = MPI.Request()
while True:
# Check for control messages from workers:
flag, msg_list = req.testall(r_ctrl)
if flag:
msg = msg_list[0]
if msg[0] == 'done':
self.log_info('removing %s from worker list' % msg[1])
workers.remove(msg[1])
# Additional control messages from the workers are processed
# here:
else:
self.process_worker_msg(msg)
# Get new control messages:
r_ctrl = []
try:
d = self.intercomm.irecv(source=MPI.ANY_SOURCE,
tag=self._ctrl_tag)
except TypeError:
# irecv() in mpi4py 1.3.1 stable uses 'dest' instead of 'source':
d = self.intercomm.irecv(dest=MPI.ANY_SOURCE,
tag=self._ctrl_tag)
r_ctrl.append(d)
if not workers:
self.log_info('finished running manager')
break
def start(self, steps=float('inf')):
"""
Tell the workers to start processing data.
"""
self.log_info('sending steps message (%s)' % steps)
for dest in xrange(len(self)):
self.intercomm.isend(['steps', str(steps)], dest, self._ctrl_tag)
self.log_info('sending start message')
for dest in xrange(len(self)):
self.intercomm.isend(['start'], dest, self._ctrl_tag)
def stop(self):
"""
Tell the workers to stop processing data.
"""
self.log_info('sending stop message')
for dest in xrange(len(self)):
self.intercomm.isend(['stop'], dest, self._ctrl_tag)
def quit(self):
"""
Tell the workers to quit.
"""
self.log_info('sending quit message')
for dest in xrange(len(self)):
self.intercomm.isend(['quit'], dest, self._ctrl_tag)
if __name__ == '__main__':
import neurokernel.mpi_relaunch
import time
setup_logger(screen=True, file_name='neurokernel.log',
mpi_comm=MPI.COMM_WORLD, multiline=True)
# Define a class whose constructor takes arguments so as to test
# instantiation of the class by the manager:
class MyWorker(Worker):
def __init__(self, x, y, z=None, routing_table=None):
super(MyWorker, self).__init__()
name = MPI.Get_processor_name()
self.log_info('I am process %d of %d on %s.' % (self.rank,
self.size, name))
self.log_info('init args: %s, %s, %s' % (x, y, z))
man = WorkerManager()
man.add(target=MyWorker, x=1, y=2, z=3)
man.add(MyWorker, 3, 4, 5)
man.add(MyWorker, 6, 7, 8)
man.spawn()
# To run for a specific number of steps, run
# man.start(number_of_steps)
man.start(100)
man.wait()