'Unmanaged memory jamming cluster during dask's merge_asof method

I am trying to merge large dataframes using dask.dataframe.multi.merge_asof, but I am running into issues with accumulating unmanaged memory on the cluster.

I have boiled down the problem to the following. It essentially just creates some sample data with timestamps using dask, converts to pandas dataframes, and then runs dask's mergea_asof implementation. It quickly exceed the memory of the workers with unmanaged memory and eventually the cluster just gets stuck (it does not crash, just stops doing anything).

It looks like this:

It looks like this

from distributed import Client, LocalCluster    
import pandas as pd
import dask 
import dask.dataframe 

client = Client(LocalCluster(n_workers=40))

# make sample datasets, and convert them to pandas dataframes 
left = dask.datasets.timeseries(start='2021-01-01', end='2022-12-31', partition_freq="3d").reset_index().compute()
right = dask.datasets.timeseries(start='2021-01-02', end='2023-01-01', partition_freq="3d").reset_index().compute()
left = dask.dataframe.from_pandas(left, npartitions=250)
right = dask.dataframe.from_pandas(right, npartitions=250)       

# Alternative to above block (no detour via pandas)
#left = dask.datasets.timeseries(start='2021-01-01', end='2022-12-31', partition_freq="3d").reset_index()
#right = dask.datasets.timeseries(start='2021-01-02', end='2023-01-01', partition_freq="3d").reset_index()   

# (dask crashes on datetime, so convert to int first)
left['timestamp'] = left['timestamp'].values.astype('int64')    
right['timestamp'] = right['timestamp'].values.astype('int64')        

dask.dataframe.multi.merge_asof(
    left=left.sort_values(by='timestamp'),
    right=right.sort_values(by='timestamp'),
    tolerance=pd.to_timedelta(10000000, unit='seconds').delta,
    direction='forward', 
    left_on='timestamp',
    right_on='timestamp',    
    ).compute()

Note that I deliberately call compute() on the sample data to get pandas dataframes, since in the final use case I also need to start from padnas dataframes, so I need to "model" that step as well.

Interestingly, not making this detour works much better. So if I outcomment the first data creation code block and use the one labelled as an alternative instead, then the merge is successful (still some unmanaged memory, but not enough to stop the cluster)

While I suspect that occupying machine memory by the local pandas dataframe may increase the unmanaged memory (?) the data size is far from taking all of the machine's RAM (200 GiB). So my questions are

  1. Why does the cluster hang despite memory being available on the machine?
  2. Why does using pandas dataframes intermittently affect memory management during merge_asof?

I have tried various things with regard to garbage collection, thinking there might be some pandas data sticking around in memory while merge_asof is executed, but to no avail. I have also tried garbage collection on the workers, as well as automatic and manual memory trimming, as described in this blog post and this video, neither of which showed any effect worth mentioning.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source