Michael,
Thank you again for making some changes to ParallelContext. Here I'm sharing how I have exploited your tips to produce a Python module that implements apply(func, *args, **kwargs), sync and async versions of map(func, *sequences), and get(var_by_str_name) operations, using the ParallelContext bulletin board on the backend. It also exposes an MPI.COMM_WORLD object for general collective operations within each subworld (very useful!). Please try to use it (and break it), so we can see how robust it is.
This code can be saved as pc_extension.py
Code: Select all
"""
Parallel processing extension to neuron.h.ParallelContext
"""
import time
import os
import sys
import pprint
try:
from mpi4py import MPI
from neuron import h
except ImportError:
raise ImportError('ParallelContextInterface: problem with importing neuron')
class Context(object):
"""
A container replacement for global variables to be shared and modified by any function in a module.
"""
def __init__(self):
self.ignore = []
self.ignore.extend(dir(self))
def update(self, namespace_dict):
"""
Converts items in a dictionary (such as globals() or locals()) into context object internals.
:param namespace_dict: dict
"""
for key, value in namespace_dict.iteritems():
setattr(self, key, value)
def __call__(self):
keys = dir(self)
for key in self.ignore:
keys.remove(key)
return {key: getattr(self, key) for key in keys}
class ParallelContextInterface(object):
"""
"""
class AsyncResultWrapper(object):
"""
When ready(), get() returns results as a list in the same order as submission.
"""
def __init__(self, interface, keys):
"""
:param interface: :class: 'ParallelContextInterface'
:param keys: list
"""
self.interface = interface
self.keys = keys
self._ready = False
def ready(self):
"""
:return: bool
"""
if self.interface.pc.working():
key = int(self.interface.pc.userid())
self.interface.collected[key] = self.interface.pc.pyret()
else:
self._ready = True
return True
if all(key in self.interface.collected for key in self.keys):
self._ready = True
return True
else:
return False
def get(self):
"""
Returns None until all results have completed, then returns a list of results in the order of original
submission.
:return: list
"""
if self._ready or self.ready():
try:
return [self.interface.collected.pop(key) for key in self.keys]
except KeyError:
raise KeyError('ParallelContextInterface: AsyncResultWrapper: all jobs have completed, but '
'not all requested keys were found')
else:
return None
def __init__(self, procs_per_worker=1):
"""
:param procs_per_worker: int
"""
self.global_comm = MPI.COMM_WORLD
self.procs_per_worker = procs_per_worker
self.pc = h.ParallelContext()
self.pc.subworlds(procs_per_worker)
self.global_rank = int(self.pc.id_world())
self.global_size = int(self.pc.nhost_world())
self.rank = int(self.pc.id())
self.size = int(self.pc.nhost())
global_ranks = [self.global_rank] * self.size
global_ranks = self.pc.py_alltoall(global_ranks)
group = self.global_comm.Get_group()
sub_group = group.Incl(global_ranks)
self.comm = self.global_comm.Create(sub_group)
self.worker_id = self.comm.bcast(int(self.pc.id_bbs()), root=0)
self.num_workers = self.comm.bcast(int(self.pc.nhost_bbs()), root=0)
# 'collected' dict acts as a temporary storage container on the master process for results retrieved from
# the ParallelContext bulletin board.
self.collected = {}
assert self.rank == self.comm.rank and self.global_rank == self.global_comm.rank and \
self.global_comm.size / self.procs_per_worker == self.num_workers, \
'ParallelContextInterface: pc.ids do not match MPI ranks'
self._running = False
self.map = self.map_sync
self.apply = self.apply_sync
self.apply_counter = 0
def print_info(self):
print 'ParallelContextInterface: process id: %i; global rank: %i / %i; local rank: %i / %i; ' \
'worker id: %i / %i' % \
(os.getpid(), self.global_rank, self.global_size, self.comm.rank, self.comm.size, self.worker_id,
self.num_workers)
time.sleep(0.1)
def wait_for_all_workers(self, key):
"""
Prevents any worker from returning until all workers have completed an operation associated with the specified
key.
:param key: int or str
"""
if self.rank == 0:
self.pc.take(key)
count = self.pc.upkscalar()
self.pc.post(key, count + 1)
while True:
# With a large number of ranks, pc.take() is more robust than pc.look()
self.pc.take(key)
count = self.pc.upkscalar()
if count == self.num_workers:
self.pc.post(key, count)
return
else:
self.pc.post(key, count)
# This pause is required to prevent the same worker from repeatedly checking the same message.
time.sleep(0.1)
# sys.stdout.flush()
def apply_sync(self, func, *args, **kwargs):
"""
ParallelContext lacks a native method to guarantee execution of a function on all workers. This method
implements a synchronous (blocking) apply operation that accepts **kwargs and returns values collected from each
worker.
:param func: callable
:param args: list
:param kwargs: dict
:return: dynamic
"""
if self._running:
apply_key = self.apply_counter
self.apply_counter += 1
self.pc.post(apply_key, 0)
keys = []
for i in xrange(self.num_workers):
keys.append(int(self.pc.submit(pc_apply_wrapper, func, apply_key, args, kwargs)))
results = self.collect_results(keys)
self.pc.take(apply_key)
return [results[key] for key in keys]
else:
result = func(*args, **kwargs)
if not self._running:
results = self.global_comm.gather(result, root=0)
if self.global_rank == 0:
return results
else:
return [result]
def collect_results(self, keys=None):
"""
If no keys are specified, this method is a blocking operation that waits until all previously submitted jobs
have been completed, retrieves all results from the bulletin board, and stores them in the 'collected' dict in
on the master process, indexed by their submission key.
If a list of keys is provided, collect_results first checks if the results have already been placed in the
'collected' dict, and otherwise blocks until all requested results are available. Results retrieved from the
bulletin board that were not requested are left in the 'collected' dict.
:param keys: list
:return: dict
"""
if keys is None:
while self.pc.working():
key = int(self.pc.userid())
self.collected[key] = self.pc.pyret()
keys = self.collected.keys()
return {key: self.collected.pop(key) for key in keys}
else:
pending_keys = [key for key in keys if key not in self.collected]
while self.pc.working():
key = int(self.pc.userid())
self.collected[key] = self.pc.pyret()
if key in pending_keys:
pending_keys.remove(key)
if not pending_keys:
break
return {key: self.collected.pop(key) for key in keys if key in self.collected}
def map_sync(self, func, *sequences):
"""
ParallelContext lacks a native method to apply a function to sequences of arguments, using all available
processes, and returning the results in the same order as the specified sequence. This method implements a
synchronous (blocking) map operation. Returns results as a list in the same order as the specified sequences.
:param func: callable
:param sequences: list
:return: list
"""
if not sequences:
return None
keys = []
for args in zip(*sequences):
key = int(self.pc.submit(func, *args))
keys.append(key)
results = self.collect_results(keys)
return [results[key] for key in keys]
def map_async(self, func, *sequences):
"""
ParallelContext lacks a native method to apply a function to sequences of arguments, using all available
processes, and returning the results in the same order as the specified sequence. This method implements an
asynchronous (non-blocking) map operation. Returns a PCAsyncResult object to track progress of the submitted
jobs.
:param func: callable
:param sequences: list
:return: list
"""
if not sequences:
return None
keys = []
for args in zip(*sequences):
key = int(self.pc.submit(func, *args))
keys.append(key)
return self.AsyncResultWrapper(self, keys)
def get(self, object_name):
"""
ParallelContext lacks a native method to get the value of an object from all workers. This method implements a
synchronous (blocking) pull operation.
:param object_name: str
:return: dynamic
"""
return self.apply_sync(find_nested_object, object_name)
def start(self, disp=False):
if disp:
self.print_info()
# time.sleep(0.1)
self._running = True
self.pc.runworker()
def stop(self):
self.pc.done()
self._running = False
def pc_apply_wrapper(func, key, args, kwargs):
"""
Methods internal to an instance of a class cannot be pickled and submitted to the neuron.h.ParallelContext bulletin
board for remote execution. As long as a module executes 'from pc_extension import *', this method can be
submitted to the bulletin board for remote execution, and prevents any worker from returning until all workers have
applied the specified function.
:param func: callable
:param key: int or str
:param args: list
:param kwargs: dict
:return: dynamic
"""
result = func(*args, **kwargs)
interface = pc_find_interface()
interface.wait_for_all_workers(key)
return result
def pc_find_interface():
"""
ParallelContextInterface apply and get operations require a remote instance of ParallelContextInterface. This method
attemps to find it in the remote __main__ namespace, or in a Context object therein.
:return: :class:'ParallelContextInterface'
"""
interface = None
try:
module = sys.modules['__main__']
for item_name in dir(module):
if isinstance(getattr(module, item_name), ParallelContextInterface):
interface = getattr(module, item_name)
break
if interface is None:
context = None
for item_name in dir(module):
if isinstance(getattr(module, item_name), Context):
context = getattr(module, item_name)
break
if context is not None:
for item_name in context():
if isinstance(getattr(context, item_name), ParallelContextInterface):
interface = getattr(context, item_name)
break
if interface is None:
raise Exception
return interface
except Exception:
raise Exception('ParallelContextInterface: remote instance of ParallelContextInterface not found in '
'the remote __main__ namespace')
def find_nested_object(object_name):
"""
This method attemps to find the object corresponding to the provided object_name (str) in the __main__ namespace.
Tolerates objects nested in other objects.
:param object_name: str
:return: dynamic
"""
this_object = None
try:
module = sys.modules['__main__']
for this_object_name in object_name.split('.'):
if this_object is None:
this_object = getattr(module, this_object_name)
else:
this_object = getattr(this_object, this_object_name)
if this_object is None:
raise Exception
return this_object
except Exception:
raise Exception('nested: object: %s not found in remote __main__ namespace' % object_name)
Then this file can be saved as use_pc_example.py
Code: Select all
from pc_extension import *
context = Context()
def collect_ranks(tag):
"""
This method demonstrates that ParallelContextInterface exposes an MPI.COMM_WORLD object in each worker subworld,
for general collective operations.
:param tag: int
:return: str
"""
time.sleep(0.1)
start_time = time.time()
ranks = context.interface.comm.gather(context.interface.global_rank, root=0)
if 'count' not in context():
context.count = 0
context.count += 1
if context.interface.rank == 0:
return 'worker_id: %i, global_ranks: %s, tag: %i, count: %i, compute time: %.2f (ms)' % \
(context.interface.worker_id, str(ranks), int(tag), context.count, (time.time() - start_time) * 1000.)
def set_count(count=None):
"""
This method demonstrates use of a Context object for sharing a namespace across functions.
:param count: int
"""
if count is None:
if 'count' not in context():
context.count = 0
context.count += 1
else:
context.count = count
print 'global rank: %i / %i, local rank: %i / %i within subworld %i / %i, count: %i' % \
(context.interface.global_rank, context.interface.global_size, context.interface.rank, context.interface.size,
context.interface.worker_id, context.interface.num_workers, context.count)
def main(procs_per_worker=1):
"""
:param procs_per_worker: int
"""
procs_per_worker = int(procs_per_worker)
context.interface = ParallelContextInterface(procs_per_worker=procs_per_worker)
if context.interface.global_rank == 0:
print 'before interface.start()\n: context.interface.apply(set_count)'
results1 = context.interface.apply(set_count)
time.sleep(0.1)
if context.interface.global_rank == 0:
pprint.pprint(results1)
time.sleep(0.1)
context.interface.start()
print 'after interface.start()\n: context.interface.apply(set_count, 5)'
results2 = context.interface.apply(set_count, 5)
time.sleep(0.1)
pprint.pprint(results2)
time.sleep(0.1)
print ': context.interface.map_sync(collect_ranks, range(10))'
results3 = context.interface.map_sync(collect_ranks, range(10))
time.sleep(0.1)
pprint.pprint(results3)
time.sleep(0.1)
print ': context.interface.map_async(collect_ranks, range(10, 20))'
results4 = context.interface.map_async(collect_ranks, range(10, 20))
while not results4.ready():
pass
print 'after ready(), before get(): collected result keys: %s' % str(context.interface.collected.keys())
time.sleep(0.1)
results4 = results4.get()
pprint.pprint(results4)
time.sleep(0.1)
print 'after ready(), after get(): collected result keys: %s' % str(context.interface.collected.keys())
time.sleep(0.1)
print ': context.interface.apply(collect_ranks, 0)'
results5 = context.interface.apply(collect_ranks, 0)
time.sleep(0.1)
pprint.pprint(results5)
time.sleep(0.1)
context.interface.stop()
if __name__ == '__main__':
main(*sys.argv[1:])
This can be executed with an optional command line argument to specify the number of MPI processes per worker subworld:
Code: Select all
mpirun -n 4 python use_pc_example.py 2
To produce the following output:
Code: Select all
numprocs=4
NEURON -- VERSION 7.5 master (b14b5dd) 2017-12-21
Duke, Yale, and the BlueBrain Project -- Copyright 1984-2016
See http://neuron.yale.edu/neuron/credits
before interface.start()
: context.interface.apply(set_count)
global rank: 0 / 4, local rank: 0 / 2 within subworld 0 / 2, count: 1
global rank: 1 / 4, local rank: 1 / 2 within subworld 0 / 2, count: 1
global rank: 2 / 4, local rank: 0 / 2 within subworld 1 / 2, count: 1
global rank: 3 / 4, local rank: 1 / 2 within subworld 1 / 2, count: 1
[None, None, None, None]
after interface.start()
: context.interface.apply(set_count, 5)
global rank: 2 / 4, local rank: 0 / 2 within subworld 1 / 2, count: 5
global rank: 3 / 4, local rank: 1 / 2 within subworld 1 / 2, count: 5
global rank: 0 / 4, local rank: 0 / 2 within subworld 0 / 2, count: 5
global rank: 1 / 4, local rank: 1 / 2 within subworld 0 / 2, count: 5
[None, None]
: context.interface.map_sync(collect_ranks, range(10))
['worker_id: 1, global_ranks: [2, 3], tag: 0, count: 6, compute time: 0.20 (ms)',
'worker_id: 0, global_ranks: [0, 1], tag: 1, count: 6, compute time: 0.17 (ms)',
'worker_id: 1, global_ranks: [2, 3], tag: 2, count: 7, compute time: 0.21 (ms)',
'worker_id: 0, global_ranks: [0, 1], tag: 3, count: 7, compute time: 0.24 (ms)',
'worker_id: 1, global_ranks: [2, 3], tag: 4, count: 8, compute time: 0.13 (ms)',
'worker_id: 0, global_ranks: [0, 1], tag: 5, count: 8, compute time: 0.16 (ms)',
'worker_id: 0, global_ranks: [0, 1], tag: 6, count: 9, compute time: 5.08 (ms)',
'worker_id: 1, global_ranks: [2, 3], tag: 7, count: 9, compute time: 0.19 (ms)',
'worker_id: 0, global_ranks: [0, 1], tag: 8, count: 10, compute time: 0.20 (ms)',
'worker_id: 1, global_ranks: [2, 3], tag: 9, count: 10, compute time: 0.12 (ms)']
: context.interface.map_async(collect_ranks, range(10, 20))
after ready(), before get(): collected result keys: [-22, -21, -20, -19, -18, -17, -16, -15, -14, -13]
['worker_id: 1, global_ranks: [2, 3], tag: 10, count: 11, compute time: 0.18 (ms)',
'worker_id: 0, global_ranks: [0, 1], tag: 11, count: 11, compute time: 0.21 (ms)',
'worker_id: 1, global_ranks: [2, 3], tag: 12, count: 12, compute time: 0.19 (ms)',
'worker_id: 0, global_ranks: [0, 1], tag: 13, count: 12, compute time: 0.18 (ms)',
'worker_id: 1, global_ranks: [2, 3], tag: 14, count: 13, compute time: 0.24 (ms)',
'worker_id: 0, global_ranks: [0, 1], tag: 15, count: 13, compute time: 0.23 (ms)',
'worker_id: 1, global_ranks: [2, 3], tag: 16, count: 14, compute time: 0.25 (ms)',
'worker_id: 0, global_ranks: [0, 1], tag: 17, count: 14, compute time: 0.26 (ms)',
'worker_id: 1, global_ranks: [2, 3], tag: 18, count: 15, compute time: 0.22 (ms)',
'worker_id: 0, global_ranks: [0, 1], tag: 19, count: 15, compute time: 0.18 (ms)']
after ready(), after get(): collected result keys: []
: context.interface.apply(collect_ranks, 0)
['worker_id: 1, global_ranks: [2, 3], tag: 0, count: 16, compute time: 0.14 (ms)',
'worker_id: 0, global_ranks: [0, 1], tag: 0, count: 16, compute time: 0.13 (ms)']