'Semaphores in dask.distributed?

I have a dask cluster with n workers and want the workers to do queries to the database. But the database is only capable of handling m queries in parallel where m < n. How can I model that in dask.distributed? Only m workers should work on such a task in parallel.

I have seen that distributed supports locks (http://distributed.readthedocs.io/en/latest/api.html#distributed.Lock). But with that, I could do only one query in parallel, not m.

Also I have seen that I could define resources per worker (https://distributed.readthedocs.io/en/latest/resources.html). But that does not fit also, as the database is independent from the workers. I would either have to define 1 database resource per worker (which leads to too much parallel queries). Or I would have to distribute m database resources to n workers, which is difficult on setting up the cluster and suboptimal in execution.

Is it possible to define something like semaphores in dask to solve that?



Solution 1:[1]

You could probably hack something together with Locks and Variables.

A cleaner solution would be to just implement Semaphores much like how Locks are implemented. Depending on your experience this may not be that hard, (the lock implementation is 150 lines) and would be a welcome pull request.

https://github.com/dask/distributed/blob/master/distributed/lock.py

Solution 2:[2]

You can use a dask.distributed.Queue

class DDSemaphore(object):
    """Dask Distributed Semaphore"""

    def __init__(self, value=1):
        self._q = dask.distributed.Queue()
        for _ in range(value):
            self._q.put(42)

    def acquire():
        self._q.get()

    def release():
        self._q.put(42)

Solution 3:[3]

dask.distributed now contains a Semaphore class that can be used via the distributed Futures API:

https://docs.dask.org/en/stable/futures.html#id1

If you're using Dask collections such as Bag, DataFrame, or Array, then you may need to obtain their enclosed Future objects to use them with the Semaphore. Do that with futures_of()

https://docs.dask.org/en/stable/user-interfaces.html?highlight=futures_of#combining-interfaces

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 MRocklin
Solution 2 Ngo
Solution 3