Passing parameters using pack/post, etc.

General issues of interest both for network and
individual cell parallelization.

Moderator: hines

hines
Site Admin
Posts: 1682
Joined: Wed May 18, 2005 3:32 pm

Re: Passing parameters using pack/post, etc.

Post by hines »

Thanks for the suggestion about pc.ready(user_id) and pc.get(user_id). Let me think about that for a while. I'm leaning toward the position that
your "hack" is in fact a petty good way to organize the return results for python since the key(result) is quite flexible and user_id as a key is one
of many possibilities. The memory inefficiency has a high water mark of the total result size since as one result is added to the python map,
that result memory in internally recovered.

The **kwargs functionality is not something that has been supported so far in general since HOC has no notion of them. However, from
the perspective of callable objects, it is not clear to me how to pass through key word arguments to callables even in pure python.

Code: Select all

def f(a, b, c='what'):
  print(a)
  print(b)
  print(c)
 
 f(1,2,'hello')
 
 def call(callable, argtuple):
   callable(*argtuple)
   
 call(f, (3, 4))
How do you pass the c named arg through the call function?
Adendum: Ok. The following works.

Code: Select all

def f(a, b, c='what'):
  print(a)
  print(b)
  print(c)
 
f(1,2,'hello')
 
def call(callable, args, kwargs):
  callable(*args, **kwargs)
   
call(f, (3, 4), {'c':'goodbye'})
But can one avoid the ugliness of {'c':'goodbye'}
aaronmil
Posts: 35
Joined: Fri Apr 25, 2014 10:54 am

Re: Passing parameters using pack/post, etc.

Post by aaronmil »

hines wrote: Sat Dec 23, 2017 9:16 am It is not clear to me how to pass through key word arguments to callables even in pure python.
Adendum: Ok. The following works.

Code: Select all

def f(a, b, c='what'):
  print(a)
  print(b)
  print(c)
 
f(1,2,'hello')
 
def call(callable, args, kwargs):
  callable(*args, **kwargs)
   
call(f, (3, 4), {'c':'goodbye'})
But can one avoid the ugliness of {'c':'goodbye'}
It's more like this:

Code: Select all

def f(a, b, c='what'):
    print(a)
    print(b)
    print(c)


def call(callable, *args, **kwargs):
    callable(*args, **kwargs)


f(1, 2, 'hello')
call(f, 3, 4, c='goodbye')
call(f, 5, 6)
which produces:

Code: Select all

1
2
hello
3
4
goodbye
5
6
what
If ParallelContext had this functionality, running the same function on the same arguments on all workers would look like this:

Code: Select all

def apply(callable, *args, **kwargs):
    # runs on all ranks except root
    pc.context(callable, *args, **kwargs)
    # runs only on root
    callable(*args, **kwargs)
hines
Site Admin
Posts: 1682
Joined: Wed May 18, 2005 3:32 pm

Re: Passing parameters using pack/post, etc.

Post by hines »

Thanks,
That suggested a fairly simple wrapper idiom so that the args can move into, rattle around, and move out of the HOC world without getting mangled. ie

Code: Select all

def wrapper(callable, args, kwargs):
  callable(*args, **kwargs)

def pccontext(callable, *args, **kwargs):
  pc.context(wrapper, callable, args, kwargs)
So the following temp.py seems to work

Code: Select all

from neuron import h
pc = h.ParallelContext()

import time
pc.subworlds(2)
nhost_world = int(pc.nhost_world())
id_world = int(pc.id_world())
nhost_bbs = int(pc.nhost_bbs())
id_bbs = int(pc.id_bbs())
nhost = int(pc.nhost())
id = int(pc.id())
def f(a, b, c="what"):
  print ("(a=%d b=%d c='%s') world (%d of %d)  bbs (%d of %d)  id/nhost (%d of %d)"%
    (a,b,c, id_world, nhost_world, id_bbs, nhost_bbs, id, nhost))

f(3,3,c="first")
time.sleep(.1) #enough time to print

def wrapper(callable, args, kwargs):
  callable(*args, **kwargs)

def pccontext(callable, *args, **kwargs):
  pc.context(wrapper, callable, args, kwargs)

pc.runworker()
print ("after runworker")
pccontext(f, 0, 0, c="hello")

f(1, 1, c="goodbye") #rank 0 of the master subworld

def wait():
  for i in range(5): #time to print and
    pc.post("wait")   # bulletin board to communicate
    time.sleep(.1)
    pc.take("wait")

wait()
pccontext(f, 10, 10)
wait()

pc.done()
h.quit()
and gives the result

Code: Select all

hines@hines-T7500:~/neuron/nrntest/nrniv/Parallel$ mpiexec -n 6 nrniv -mpi -python temp.py
numprocs=6
NEURON -- VERSION 7.5 master (b14b5dd) 2017-12-21
Duke, Yale, and the BlueBrain Project -- Copyright 1984-2016
See http://neuron.yale.edu/neuron/credits

(a=3 b=3 c='first') world (4 of 6)  bbs (2 of 3)  id/nhost (0 of 2)
(a=3 b=3 c='first') world (5 of 6)  bbs (-1 of -1)  id/nhost (1 of 2)
(a=3 b=3 c='first') world (0 of 6)  bbs (0 of 3)  id/nhost (0 of 2)
(a=3 b=3 c='first') world (1 of 6)  bbs (-1 of -1)  id/nhost (1 of 2)
(a=3 b=3 c='first') world (2 of 6)  bbs (1 of 3)  id/nhost (0 of 2)
(a=3 b=3 c='first') world (3 of 6)  bbs (-1 of -1)  id/nhost (1 of 2)
after runworker
(a=0 b=0 c='hello') world (1 of 6)  bbs (-1 of -1)  id/nhost (1 of 2)
(a=0 b=0 c='hello') world (2 of 6)  bbs (1 of 3)  id/nhost (0 of 2)
(a=0 b=0 c='hello') world (4 of 6)  bbs (2 of 3)  id/nhost (0 of 2)
(a=1 b=1 c='goodbye') world (0 of 6)  bbs (0 of 3)  id/nhost (0 of 2)
(a=0 b=0 c='hello') world (5 of 6)  bbs (-1 of -1)  id/nhost (1 of 2)
(a=0 b=0 c='hello') world (3 of 6)  bbs (-1 of -1)  id/nhost (1 of 2)
(a=10 b=10 c='what') world (2 of 6)  bbs (1 of 3)  id/nhost (0 of 2)
(a=10 b=10 c='what') world (3 of 6)  bbs (-1 of -1)  id/nhost (1 of 2)
(a=10 b=10 c='what') world (5 of 6)  bbs (-1 of -1)  id/nhost (1 of 2)
(a=10 b=10 c='what') world (4 of 6)  bbs (2 of 3)  id/nhost (0 of 2)
(a=10 b=10 c='what') world (1 of 6)  bbs (-1 of -1)  id/nhost (1 of 2)
hines@hines-T7500:~/neuron/nrntest/nrniv/Parallel$ 
Hope that suffices.
aaronmil
Posts: 35
Joined: Fri Apr 25, 2014 10:54 am

Re: Passing parameters using pack/post, etc.

Post by aaronmil »

Michael,

Thank you again for making some changes to ParallelContext. Here I'm sharing how I have exploited your tips to produce a Python module that implements apply(func, *args, **kwargs), sync and async versions of map(func, *sequences), and get(var_by_str_name) operations, using the ParallelContext bulletin board on the backend. It also exposes an MPI.COMM_WORLD object for general collective operations within each subworld (very useful!). Please try to use it (and break it), so we can see how robust it is.

This code can be saved as pc_extension.py

Code: Select all

"""
Parallel processing extension to neuron.h.ParallelContext
"""
import time
import os
import sys
import pprint

try:
    from mpi4py import MPI
    from neuron import h
except ImportError:
    raise ImportError('ParallelContextInterface: problem with importing neuron')


class Context(object):
    """
    A container replacement for global variables to be shared and modified by any function in a module.
    """
    def __init__(self):
        self.ignore = []
        self.ignore.extend(dir(self))

    def update(self, namespace_dict):
        """
        Converts items in a dictionary (such as globals() or locals()) into context object internals.
        :param namespace_dict: dict
        """
        for key, value in namespace_dict.iteritems():
            setattr(self, key, value)

    def __call__(self):
        keys = dir(self)
        for key in self.ignore:
            keys.remove(key)
        return {key: getattr(self, key) for key in keys}


class ParallelContextInterface(object):
    """

    """
    class AsyncResultWrapper(object):
        """
        When ready(), get() returns results as a list in the same order as submission.
        """
        def __init__(self, interface, keys):
            """

            :param interface: :class: 'ParallelContextInterface'
            :param keys: list
            """
            self.interface = interface
            self.keys = keys
            self._ready = False

        def ready(self):
            """

            :return: bool
            """
            if self.interface.pc.working():
                key = int(self.interface.pc.userid())
                self.interface.collected[key] = self.interface.pc.pyret()
            else:
                self._ready = True
                return True
            if all(key in self.interface.collected for key in self.keys):
                self._ready = True
                return True
            else:
                return False

        def get(self):
            """
            Returns None until all results have completed, then returns a list of results in the order of original
            submission.
            :return: list
            """
            if self._ready or self.ready():
                try:
                    return [self.interface.collected.pop(key) for key in self.keys]
                except KeyError:
                    raise KeyError('ParallelContextInterface: AsyncResultWrapper: all jobs have completed, but '
                                   'not all requested keys were found')
            else:
                return None
    
    def __init__(self, procs_per_worker=1):
        """

        :param procs_per_worker: int
        """
        self.global_comm = MPI.COMM_WORLD
        self.procs_per_worker = procs_per_worker
        self.pc = h.ParallelContext()
        self.pc.subworlds(procs_per_worker)
        self.global_rank = int(self.pc.id_world())
        self.global_size = int(self.pc.nhost_world())
        self.rank = int(self.pc.id())
        self.size = int(self.pc.nhost())
        global_ranks = [self.global_rank] * self.size
        global_ranks = self.pc.py_alltoall(global_ranks)
        group = self.global_comm.Get_group()
        sub_group = group.Incl(global_ranks)
        self.comm = self.global_comm.Create(sub_group)
        self.worker_id = self.comm.bcast(int(self.pc.id_bbs()), root=0)
        self.num_workers = self.comm.bcast(int(self.pc.nhost_bbs()), root=0)
        # 'collected' dict acts as a temporary storage container on the master process for results retrieved from
        # the ParallelContext bulletin board.
        self.collected = {}
        assert self.rank == self.comm.rank and self.global_rank == self.global_comm.rank and \
               self.global_comm.size / self.procs_per_worker == self.num_workers, \
            'ParallelContextInterface: pc.ids do not match MPI ranks'
        self._running = False
        self.map = self.map_sync
        self.apply = self.apply_sync
        self.apply_counter = 0

    def print_info(self):
        print 'ParallelContextInterface: process id: %i; global rank: %i / %i; local rank: %i / %i; ' \
              'worker id: %i / %i' % \
              (os.getpid(), self.global_rank, self.global_size, self.comm.rank, self.comm.size, self.worker_id,
               self.num_workers)
        time.sleep(0.1)

    def wait_for_all_workers(self, key):
        """
        Prevents any worker from returning until all workers have completed an operation associated with the specified
        key.
        :param key: int or str
        """
        if self.rank == 0:
            self.pc.take(key)
            count = self.pc.upkscalar()
            self.pc.post(key, count + 1)
            while True:
                # With a large number of ranks, pc.take() is more robust than pc.look()
                self.pc.take(key)
                count = self.pc.upkscalar()
                if count == self.num_workers:
                    self.pc.post(key, count)
                    return
                else:
                    self.pc.post(key, count)
                    # This pause is required to prevent the same worker from repeatedly checking the same message.
                    time.sleep(0.1)
                    # sys.stdout.flush()
    
    def apply_sync(self, func, *args, **kwargs):
        """
        ParallelContext lacks a native method to guarantee execution of a function on all workers. This method
        implements a synchronous (blocking) apply operation that accepts **kwargs and returns values collected from each
        worker.
        :param func: callable
        :param args: list
        :param kwargs: dict
        :return: dynamic
        """
        if self._running:
            apply_key = self.apply_counter
            self.apply_counter += 1
            self.pc.post(apply_key, 0)
            keys = []
            for i in xrange(self.num_workers):
                keys.append(int(self.pc.submit(pc_apply_wrapper, func, apply_key, args, kwargs)))
            results = self.collect_results(keys)
            self.pc.take(apply_key)
            return [results[key] for key in keys]
        else:
            result = func(*args, **kwargs)
            if not self._running:
                results = self.global_comm.gather(result, root=0)
                if self.global_rank == 0:
                    return results
            else:
                return [result]
    
    def collect_results(self, keys=None):
        """
        If no keys are specified, this method is a blocking operation that waits until all previously submitted jobs 
        have been completed, retrieves all results from the bulletin board, and stores them in the 'collected' dict in 
        on the master process, indexed by their submission key.
        If a list of keys is provided, collect_results first checks if the results have already been placed in the
        'collected' dict, and otherwise blocks until all requested results are available. Results retrieved from the
        bulletin board that were not requested are left in the 'collected' dict.
        :param keys: list
        :return: dict
        """
        if keys is None:
            while self.pc.working():
                key = int(self.pc.userid())
                self.collected[key] = self.pc.pyret()
            keys = self.collected.keys()
            return {key: self.collected.pop(key) for key in keys}
        else:
            pending_keys = [key for key in keys if key not in self.collected]
            while self.pc.working():
                key = int(self.pc.userid())
                self.collected[key] = self.pc.pyret()
                if key in pending_keys:
                    pending_keys.remove(key)
                if not pending_keys:
                    break
            return {key: self.collected.pop(key) for key in keys if key in self.collected}

    def map_sync(self, func, *sequences):
        """
        ParallelContext lacks a native method to apply a function to sequences of arguments, using all available
        processes, and returning the results in the same order as the specified sequence. This method implements a
        synchronous (blocking) map operation. Returns results as a list in the same order as the specified sequences.
        :param func: callable
        :param sequences: list
        :return: list
        """
        if not sequences:
            return None
        keys = []
        for args in zip(*sequences):
            key = int(self.pc.submit(func, *args))
            keys.append(key)
        results = self.collect_results(keys)
        return [results[key] for key in keys]

    def map_async(self, func, *sequences):
        """
        ParallelContext lacks a native method to apply a function to sequences of arguments, using all available
        processes, and returning the results in the same order as the specified sequence. This method implements an
        asynchronous (non-blocking) map operation. Returns a PCAsyncResult object to track progress of the submitted
        jobs.
        :param func: callable
        :param sequences: list
        :return: list
        """
        if not sequences:
            return None
        keys = []
        for args in zip(*sequences):
            key = int(self.pc.submit(func, *args))
            keys.append(key)
        return self.AsyncResultWrapper(self, keys)

    def get(self, object_name):
        """
        ParallelContext lacks a native method to get the value of an object from all workers. This method implements a
        synchronous (blocking) pull operation.
        :param object_name: str
        :return: dynamic
        """
        return self.apply_sync(find_nested_object, object_name)

    def start(self, disp=False):
        if disp:
            self.print_info()
            # time.sleep(0.1)
        self._running = True
        self.pc.runworker()

    def stop(self):
        self.pc.done()
        self._running = False


def pc_apply_wrapper(func, key, args, kwargs):
    """
    Methods internal to an instance of a class cannot be pickled and submitted to the neuron.h.ParallelContext bulletin 
    board for remote execution. As long as a module executes 'from pc_extension import *', this method can be
    submitted to the bulletin board for remote execution, and prevents any worker from returning until all workers have
    applied the specified function.
    :param func: callable
    :param key: int or str
    :param args: list
    :param kwargs: dict
    :return: dynamic
    """
    result = func(*args, **kwargs)
    interface = pc_find_interface()
    interface.wait_for_all_workers(key)
    return result


def pc_find_interface():
    """
    ParallelContextInterface apply and get operations require a remote instance of ParallelContextInterface. This method
    attemps to find it in the remote __main__ namespace, or in a Context object therein.
    :return: :class:'ParallelContextInterface'
    """
    interface = None
    try:
        module = sys.modules['__main__']
        for item_name in dir(module):
            if isinstance(getattr(module, item_name), ParallelContextInterface):
                interface = getattr(module, item_name)
                break
        if interface is None:
            context = None
            for item_name in dir(module):
                if isinstance(getattr(module, item_name), Context):
                    context = getattr(module, item_name)
                    break
            if context is not None:
                for item_name in context():
                    if isinstance(getattr(context, item_name), ParallelContextInterface):
                        interface = getattr(context, item_name)
                        break
            if interface is None:
                raise Exception
        return interface
    except Exception:
        raise Exception('ParallelContextInterface: remote instance of ParallelContextInterface not found in '
                        'the remote __main__ namespace')


def find_nested_object(object_name):
    """
    This method attemps to find the object corresponding to the provided object_name (str) in the __main__ namespace.
    Tolerates objects nested in other objects.
    :param object_name: str
    :return: dynamic
    """
    this_object = None
    try:
        module = sys.modules['__main__']
        for this_object_name in object_name.split('.'):
            if this_object is None:
                this_object = getattr(module, this_object_name)
            else:
                this_object = getattr(this_object, this_object_name)
        if this_object is None:
            raise Exception
        return this_object
    except Exception:
        raise Exception('nested: object: %s not found in remote __main__ namespace' % object_name)
Then this file can be saved as use_pc_example.py

Code: Select all

from pc_extension import *


context = Context()


def collect_ranks(tag):
    """
    This method demonstrates that ParallelContextInterface exposes an MPI.COMM_WORLD object in each worker subworld,
    for general collective operations.
    :param tag: int
    :return: str
    """
    time.sleep(0.1)
    start_time = time.time()
    ranks = context.interface.comm.gather(context.interface.global_rank, root=0)
    if 'count' not in context():
        context.count = 0
    context.count += 1
    if context.interface.rank == 0:
        return 'worker_id: %i, global_ranks: %s, tag: %i, count: %i, compute time: %.2f (ms)' % \
               (context.interface.worker_id, str(ranks), int(tag), context.count, (time.time() - start_time) * 1000.)


def set_count(count=None):
    """
    This method demonstrates use of a Context object for sharing a namespace across functions.
    :param count: int
    """
    if count is None:
        if 'count' not in context():
            context.count = 0
        context.count += 1
    else:
        context.count = count
    print 'global rank: %i / %i, local rank: %i / %i within subworld %i / %i, count: %i' % \
          (context.interface.global_rank, context.interface.global_size, context.interface.rank, context.interface.size,
           context.interface.worker_id, context.interface.num_workers, context.count)


def main(procs_per_worker=1):
    """

    :param procs_per_worker: int
    """
    procs_per_worker = int(procs_per_worker)
    context.interface = ParallelContextInterface(procs_per_worker=procs_per_worker)
    if context.interface.global_rank == 0:
        print 'before interface.start()\n: context.interface.apply(set_count)'
    results1 = context.interface.apply(set_count)
    time.sleep(0.1)
    if context.interface.global_rank == 0:
        pprint.pprint(results1)
    time.sleep(0.1)
    context.interface.start()
    print 'after interface.start()\n: context.interface.apply(set_count, 5)'
    results2 = context.interface.apply(set_count, 5)
    time.sleep(0.1)
    pprint.pprint(results2)
    time.sleep(0.1)
    print ': context.interface.map_sync(collect_ranks, range(10))'
    results3 = context.interface.map_sync(collect_ranks, range(10))
    time.sleep(0.1)
    pprint.pprint(results3)
    time.sleep(0.1)
    print ': context.interface.map_async(collect_ranks, range(10, 20))'
    results4 = context.interface.map_async(collect_ranks, range(10, 20))
    while not results4.ready():
        pass
    print 'after ready(), before get(): collected result keys: %s' % str(context.interface.collected.keys())
    time.sleep(0.1)
    results4 = results4.get()
    pprint.pprint(results4)
    time.sleep(0.1)
    print 'after ready(), after get(): collected result keys: %s' % str(context.interface.collected.keys())
    time.sleep(0.1)
    print ': context.interface.apply(collect_ranks, 0)'
    results5 = context.interface.apply(collect_ranks, 0)
    time.sleep(0.1)
    pprint.pprint(results5)
    time.sleep(0.1)
    context.interface.stop()


if __name__ == '__main__':
    main(*sys.argv[1:])
This can be executed with an optional command line argument to specify the number of MPI processes per worker subworld:

Code: Select all

mpirun -n 4 python use_pc_example.py 2
To produce the following output:

Code: Select all

numprocs=4
NEURON -- VERSION 7.5 master (b14b5dd) 2017-12-21
Duke, Yale, and the BlueBrain Project -- Copyright 1984-2016
See http://neuron.yale.edu/neuron/credits

before interface.start()
: context.interface.apply(set_count)
global rank: 0 / 4, local rank: 0 / 2 within subworld 0 / 2, count: 1
global rank: 1 / 4, local rank: 1 / 2 within subworld 0 / 2, count: 1
global rank: 2 / 4, local rank: 0 / 2 within subworld 1 / 2, count: 1
global rank: 3 / 4, local rank: 1 / 2 within subworld 1 / 2, count: 1
[None, None, None, None]
after interface.start()
: context.interface.apply(set_count, 5)
global rank: 2 / 4, local rank: 0 / 2 within subworld 1 / 2, count: 5
global rank: 3 / 4, local rank: 1 / 2 within subworld 1 / 2, count: 5
global rank: 0 / 4, local rank: 0 / 2 within subworld 0 / 2, count: 5
global rank: 1 / 4, local rank: 1 / 2 within subworld 0 / 2, count: 5
[None, None]
: context.interface.map_sync(collect_ranks, range(10))
['worker_id: 1, global_ranks: [2, 3], tag: 0, count: 6, compute time: 0.20 (ms)',
 'worker_id: 0, global_ranks: [0, 1], tag: 1, count: 6, compute time: 0.17 (ms)',
 'worker_id: 1, global_ranks: [2, 3], tag: 2, count: 7, compute time: 0.21 (ms)',
 'worker_id: 0, global_ranks: [0, 1], tag: 3, count: 7, compute time: 0.24 (ms)',
 'worker_id: 1, global_ranks: [2, 3], tag: 4, count: 8, compute time: 0.13 (ms)',
 'worker_id: 0, global_ranks: [0, 1], tag: 5, count: 8, compute time: 0.16 (ms)',
 'worker_id: 0, global_ranks: [0, 1], tag: 6, count: 9, compute time: 5.08 (ms)',
 'worker_id: 1, global_ranks: [2, 3], tag: 7, count: 9, compute time: 0.19 (ms)',
 'worker_id: 0, global_ranks: [0, 1], tag: 8, count: 10, compute time: 0.20 (ms)',
 'worker_id: 1, global_ranks: [2, 3], tag: 9, count: 10, compute time: 0.12 (ms)']
: context.interface.map_async(collect_ranks, range(10, 20))
after ready(), before get(): collected result keys: [-22, -21, -20, -19, -18, -17, -16, -15, -14, -13]
['worker_id: 1, global_ranks: [2, 3], tag: 10, count: 11, compute time: 0.18 (ms)',
 'worker_id: 0, global_ranks: [0, 1], tag: 11, count: 11, compute time: 0.21 (ms)',
 'worker_id: 1, global_ranks: [2, 3], tag: 12, count: 12, compute time: 0.19 (ms)',
 'worker_id: 0, global_ranks: [0, 1], tag: 13, count: 12, compute time: 0.18 (ms)',
 'worker_id: 1, global_ranks: [2, 3], tag: 14, count: 13, compute time: 0.24 (ms)',
 'worker_id: 0, global_ranks: [0, 1], tag: 15, count: 13, compute time: 0.23 (ms)',
 'worker_id: 1, global_ranks: [2, 3], tag: 16, count: 14, compute time: 0.25 (ms)',
 'worker_id: 0, global_ranks: [0, 1], tag: 17, count: 14, compute time: 0.26 (ms)',
 'worker_id: 1, global_ranks: [2, 3], tag: 18, count: 15, compute time: 0.22 (ms)',
 'worker_id: 0, global_ranks: [0, 1], tag: 19, count: 15, compute time: 0.18 (ms)']
after ready(), after get(): collected result keys: []
: context.interface.apply(collect_ranks, 0)
['worker_id: 1, global_ranks: [2, 3], tag: 0, count: 16, compute time: 0.14 (ms)',
 'worker_id: 0, global_ranks: [0, 1], tag: 0, count: 16, compute time: 0.13 (ms)']
Post Reply