'dask bag embarrassingly parallel but with a generator

Example code:

import dask.bag as db
from dask import delayed
from dask.distributed import Client, LocalCluster

N = 10**6
def load():
    return delayed(range(N))
if __name__ == '__main__':
    client = Client(LocalCluster())
    bag = db.from_delayed([load(),load()]).map(lambda x: 2*x)
    # bag = db.from_delayed([load(),load()]).repartition(npartitions=4).map(lambda x: 2*x)
    out = bag.count().compute()
    print(out)

If you were to run this code you'll quickly find out that it runs on only 2 workers as db.from_delayed chose to partition this bag into 2 partitions. Clearly this code is embarrassingly parallel but I cannot figure out how to parallel it with dask. repartition seem to exhaust the entire generator or running it multiple times, I am not sure... it easily eats my entire ram and crashes for N = 10**8 which may take a while otherwise. I expected it to split the data across workers mostly equally without needing to know how much data there is.

I have searched around and I'm seeing a lot of talk about futures, joblib and client.scatter/submit/map (https://stackoverflow.com/a/52678997/12603110) but then how does one chain the processed futures into a dask bag/dataframe?

I just want the generator to be generating on 1 thread non-stop and "scatter" the items to a bag for parallel processing without storing unnecessary intermediates.
is it that complicated?

bag = db.from_generator(load()).map(lambda x: 2*x) #computes in parallel


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source