Python – Use dask delayed with function return lists

Use dask delayed with function return lists… here is a solution to the problem.

Use dask delayed with function return lists

I’m trying to build a task map using dask.delayed. This mostly works just fine, but I often run into situations where I have many lingering objects that have a method that returns a list of objects whose length is not easily calculated based on the information I have currently available:

items = get_collection() # known length

def do_work(item):
    # get_list_of_things returns list of "unknown" length
    return map(lambda x: x.DoStuff(), item.get_list_of_things())

results = [delayed(do_work(x)) for x in items]

This gives one

TypeError: Delayed objects of unspecified length are not iterable

Is there any way dask can solve this problem, preferably without having to call .compute() on intermediate results, because that would break most of the benefits of having a task map? It basically means that the graph cannot be fully resolved before running certain steps, but the only variable is the width of the parallel part, which does not change the structure or depth of the graph.

Solution

Unfortunately, if you want to

call a separate function for each element in the list, then it is part of the structure of your graph, and if you want to know to use dask.delayed when constructing a graph.

In general, I see two options:

  1. Instead of creating a separate task for each element in the list, create a task for the first 10%, the second 10%, and so on. This is the same approach taken in dask.bag, which also handles parallelism with an unknown number of elements (which might be worth considering.)

    http://dask.pydata.org/en/latest/bag.html

  2. Switch to the live concurrent.futures interface and wait for your listing results before submitting more work

    from dask.distributed import Client
    client = Client()
    list_future = client.submit(do_work, *args)
    len_future = client.submit(len, list_future)
    
    n = len_future.result()  # wait until the length is computed
    
    futures = [client.submit(operator.getitem, list_future, i) for i in range(n)]
    
    ... do more stuff with futures
    

    http://dask.pydata.org/en/latest/futures.html

Related Problems and Solutions