Use dask delayed with function return lists
I’m trying to build a task map using dask.delayed. This mostly works just fine, but I often run into situations where I have many lingering objects that have a method that returns a list of objects whose length is not easily calculated based on the information I have currently available:
items = get_collection() # known length
def do_work(item):
# get_list_of_things returns list of "unknown" length
return map(lambda x: x.DoStuff(), item.get_list_of_things())
results = [delayed(do_work(x)) for x in items]
This gives one
TypeError: Delayed objects of unspecified length are not iterable
Is there any way dask can solve this problem, preferably without having to call .compute() on intermediate results, because that would break most of the benefits of having a task map? It basically means that the graph cannot be fully resolved before running certain steps, but the only variable is the width of the parallel part, which does not change the structure or depth of the graph.
Solution
Unfortunately, if you want to
call a separate function for each element in the list, then it is part of the structure of your graph, and if you want to know to use dask.delayed when constructing a graph.
In general, I see two options:
Instead of creating a separate task for each element in the list, create a task for the first 10%, the second 10%, and so on. This is the same approach taken in dask.bag, which also handles parallelism with an unknown number of elements (which might be worth considering.)
Switch to the live concurrent.futures interface and wait for your listing results before submitting more work
from dask.distributed import Client client = Client() list_future = client.submit(do_work, *args) len_future = client.submit(len, list_future) n = len_future.result() # wait until the length is computed futures = [client.submit(operator.getitem, list_future, i) for i in range(n)] ... do more stuff with futures