'dask bag embarrassingly parallel but with a generator
Example code:
import dask.bag as db
from dask import delayed
from dask.distributed import Client, LocalCluster
N = 10**6
def load():
return delayed(range(N))
if __name__ == '__main__':
client = Client(LocalCluster())
bag = db.from_delayed([load(),load()]).map(lambda x: 2*x)
# bag = db.from_delayed([load(),load()]).repartition(npartitions=4).map(lambda x: 2*x)
out = bag.count().compute()
print(out)
If you were to run this code you'll quickly find out that it runs on only 2 workers as db.from_delayed
chose to partition this bag into 2 partitions.
Clearly this code is embarrassingly parallel but I cannot figure out how to parallel it with dask.
repartition seem to exhaust the entire generator or running it multiple times, I am not sure... it easily eats my entire ram and crashes for N = 10**8
which may take a while otherwise. I expected it to split the data across workers mostly equally without needing to know how much data there is.
I have searched around and I'm seeing a lot of talk about futures, joblib and client.scatter/submit/map (https://stackoverflow.com/a/52678997/12603110) but then how does one chain the processed futures into a dask bag/dataframe?
I just want the generator to be generating on 1 thread non-stop and "scatter" the items to a bag for parallel processing without storing unnecessary intermediates.
is it that complicated?
bag = db.from_generator(load()).map(lambda x: 2*x) #computes in parallel
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|