Python – Is there any way to use multiprocessing.pool in nested functions or modules?

Is there any way to use multiprocessing.pool in nested functions or modules?… here is a solution to the problem.

Is there any way to use multiprocessing.pool in nested functions or modules?

Thank you for checking this out. I admit I’ve had 1 week trying parallel processing in python, so I apologize if I missed an obvious solution. I have a piece of code that I want to run several different instances of mp.pool(). Those calls that are in the main .py file work fine, but when I try to add them to the function in the module, I don’t get all the output from them. The app just runs past and continues. I guess this might have something to do with post But it doesn’t give any alternative ways to accomplish the idea I need. The code that works in a simple example looks like this:

import multiprocessing as mp
def multiproc_log_result(retval):
    results.append(retval)
    if len(results) % (10 // 10) == 0:
        print('{0}% done'.format(100 * len(results) / 10))

def meat():
    print 'beef'
    status = True
    return status
results = []
pool = mp. Pool(thread_count)
for x in range(10):
    pool.apply_async(meat, callback=multiproc_log_result)
pool.close()
pool.join()

def veggie():
    print 'carrot'
    status = True
    return status

results = []
pool = mp. Pool(thread_count)
for x in range(10):
    pool.apply_async(veggie, callback=multiproc_log_result)
pool.close()
pool.join()

The invalid code is:

import multiprocessing as mp
def multiproc_log_result(retval):
    results.append(retval)
    if len(results) % (10 // 10) == 0:
        print('{0}% done'.format(100 * len(results) / 10))

def meat():
    print 'beef'
    status = True
    return status
results = []
pool = mp. Pool(thread_count)
for x in range(10):
    pool.apply_async(meat, callback=multiproc_log_result)
pool.close()
pool.join()

def nested_stupid_fn():
    def multiproc_log_result(retval):
        results.append(retval)
        if len(results) % (10 // 10) == 0:
            print('{0}% done'.format(100 * len(results) / 10))

def veggie():
        print 'carrot'
        status = True
        return status

results = []
    pool = mp. Pool(thread_count)
    for x in range(10):
        pool.apply_async(veggie, callback=multiproc_log_result)
    pool.close()
    pool.join()
nested_stupid_fn()

Eventually, I hope that example that doesn’t work is removed by putting it in another function in a separate module for a step. So when I import the module packngo and use it as packngo.basic_packngo (inputs) and include the contents of the nest function somewhere in it, they run. Any help would be appreciated. 😀 I’m a very simple person, so if you can explain it like you would to child, maybe I’ll get it!

Solution

The other problem you linked has a solution, just not stated: you can’t use nested functions as arguments to func apply*/*map* multiprocessing. A series of methods on the pool. They work for multiprocessing.dummy.Pool because multiprocessing.dummy is backed by threads that can be referenced directly by passing functions, but multiprocessing. Pool must be a pickle function, and can only pickle functions with importable names. If you check the name of the nested function, it looks like modulename.outerfuncname.<locals>.innerfuncname. , that < locals> component makes it impossible to import (which is usually a good thing; Nested functions that utilize nesting often have critical states within the scope of closures and are lost just by importing).

It’s perfectly fine for callbacks to nest function, because they execute in the parent process, so they are not sent to the worker. In your case, only the callback depends on the closure scope, so moving func is perfectly fine. ( veggie ) to the global scope, define your packngo module as:

def veggie():
    print 'carrot'
    status = True
    return status

def nested_stupid_fn():
    def multiproc_log_result(retval):
        results.append(retval)
        if len(results) % (10 // 10) == 0:
            print('{0}% done'.format(100 * len(results) / 10))

results = []
    pool = mp. Pool(thread_count)
    for x in range(10):
        pool.apply_async(veggie, callback=multiproc_log_result)
    pool.close()
    pool.join()
nested_stupid_fn()

Yes, this means that Veggie becomes a public member of the relevant module. If you want to indicate that it should be considered an implementation detail, you can precede it with an underscore ( _veggie ), but it must be global to be associated with multiprocessing. Pool together

Related Problems and Solutions