'Python Asyncio/Trio for Asynchronous Computing/Fetching

I am looking for a way to efficiently fetch a chunk of values from disk, and then perform computation/calculations on the chunk. My thought was a for loop that would run the disk fetching task first, then run the computation on the fetched data. I want to have my program fetch the next batch as it is running the computation so I don't have to wait for another data fetch every time a computation completes. I expect the computation will take longer than the fetching of the data from disk, and likely cannot be done truly in parallel due to a single computation task already pinning the cpu usage at near 100%.

I have provided some code below in python using trio (but could alternatively be used with asyncio to the same effect) to illustrate my best attempt at performing this operation with async programming:

import trio
import numpy as np
from datetime import datetime as dt
import time

testiters=10
dim = 6000


def generateMat(arrlen):
    for _ in range(30):
        retval= np.random.rand(arrlen, arrlen)
    # print("matrix generated")
    return retval

def computeOpertion(matrix):
    return np.linalg.inv(matrix)


def runSync():
    for _ in range(testiters):
        mat=generateMat(dim)
        result=computeOpertion(mat)
    return result

async def matGenerator_Async(count):
    for _ in range(count):
        yield generateMat(dim)

async def computeOpertion_Async(matrix):
    return computeOpertion(matrix)

async def runAsync():
    async with trio.open_nursery() as nursery:
        async for value in matGenerator_Async(testiters): 
            nursery.start_soon(computeOpertion_Async,value)
            #await computeOpertion_Async(value)

            

print("Sync:")
start=dt.now()
runSync()
print(dt.now()-start)

print("Async:")
start=dt.now()
trio.run(runAsync)
print(dt.now()-start)

This code will simulate getting data from disk by generating 30 random matrices, which uses a small amount of cpu. It will then perform matrix inversion on the generated matrix, which uses 100% cpu (with openblas/mkl configuration in numpy). I compare the time taken to run the tasks by timing the synchronous and asynchronous operations.

From what I can tell, both jobs take exactly the same amount of time to finish, meaning the async operation did not speed up the execution. Observing the behavior of each computation, the sequential operation runs the fetch and computation in order and the async operation runs all the fetches first, then all the computations afterwards.

Is there a way to use asynchronously fetch and compute? Perhaps with futures or something like gather()? Asyncio has these functions, and trio has them in a seperate package trio_future. I am also open to solutions via other methods (threads and multiprocessing).

I believe that there likely exists a solution with multiprocessing that can make the disk reading operation run in a separate process. However, inter-process communication and blocking then becomes a hassle, as I would need some sort of semaphore to control how many blocks could be generated at a time due to memory constraints, and multiprocessing tends to be quite heavy and slow.

EDIT

Thank you VPfB for your answer. I am not able to sleep(0) in the operation, but I think even if I did, it would necessarily block the computation in favor of performing disk operations. I think this may be a hard limitation of python threading and asyncio, that it can only execute 1 thread at a time. Running two different processes simultaneously is impossible if both require anything but waiting for some external resource to respond from your CPU.

Perhaps there is a way with an executor for a multiprocessing pool. I have added the following code below:

import asyncio
import concurrent.futures

async def asynciorunAsync():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:    
         async for value in matGenerator_Async(testiters):              
            result = await loop.run_in_executor(pool, computeOpertion,value)


print("Async with PoolExecutor:")
start=dt.now()
asyncio.run(asynciorunAsync())
print(dt.now()-start)

Although timing this, it still takes the same amount of time as the synchronous example. I think I will have to go with a more involved solution as it seems that async and await are too crude of a tool to properly do this type of task switching.



Solution 1:[1]

I don't work with trio, my answer it asyncio based.

Under these circumstances the only way to improve the asyncio performance I see is to break the computation into smaller pieces and insert await sleep(0) between them. This would allow the data fetching task to run.

Asyncio uses cooperative scheduling. A synchronous CPU bound routine does not cooperate, it blocks everything else while it is running.

sleep() always suspends the current task, allowing other tasks to run.

Setting the delay to 0 provides an optimized path to allow other tasks to run. This can be used by long-running functions to avoid blocking the event loop for the full duration of the function call.

(quoted from: asyncio.sleep)


If that is not possible, try to run the computation in an executor. This adds some multi-threading capabilities to otherwise pure asyncio code.

Solution 2:[2]

The point of async I/O is to make it easy to write programs where there is lots of network I/O but very little actual computation (or disk I/O). That applies to any async library (Trio or asyncio) or even different languages (e.g. ASIO in C++). So your program is ideally unsuited to async I/O! You will need to use multiple threads (or processes). Although, in fairness, async I/O including Trio can be useful for coordinating work on threads, and that might work well in your case.

As VPfB's answer says, if you're using asyncio then you can use executors, specifically a ThreadPoolExecutor passed to loop.run_in_executor(). For Trio, the equivalent would be trio.to_thread.run_sync() (see also Threads (if you must) in the Trio docs), which is even easier to use. In both cases, you can await the result, so the function is running in a separate thread while the main Trio thread can continue running your async code. Your code would end up looking something like this:

async def matGenerator_Async(count):
    for _ in range(count):
        yield await trio.to_thread.run_sync(generateMat, dim)

async def my_trio_main()
    async with trio.open_nursery() as nursery:
        async for matrix in matGenerator_Async(testiters):
             nursery.start_soon(trio.to_thread.run_sync, computeOperation, matrix)

trio.run(my_trio_main)

There's no need for the computation functions (generateMat and computeOperation) to be async. In fact, it's problematic if they are because you could no longer run them in a separate thread. In general, only make a function async if it needs to await something or use async with or async for.

You can see from the above example how to pass data to the functions running in the other thread: just pass them as parameters to trio.to_thread.run_sync(), and they will be passed along as parameters to the function. Getting the result back from generateMat() is also straightforward - the return value of the function called in the other thread is returned from await trio.to_thread.run_sync(). Getting the result of computeOperation() is trickier, because it's called in the nursery, so its return value is thrown away. You'll need to pass a mutable parameter to it (like a dict) and stash the result in there. But be careful about thread safety; the easiest way to do that is to pass a new object to each coroutine, and only inspect them all after the nursery has finished.

A few final footnotes that you can probably ignore:

  • Just to be clear, yield await in the code above isn't some sort of special syntax. It's just await foo(), which returns a value once foo() has finished, followed by yield of that value.
  • You can change the number of threads Trio uses for calls to to_thread.run_sync() by passing a CapacityLimiter object, or by finding the default one and setting the count on that. It looks like the default is currently 40, so you might want to turn that down a bit, but it's probably not too important.
  • There is a common myth that Python doesn't support threads, or at least can't do computation in multiple threads simultaneously, because it has a single global lock (the global interpreter lock, or GIL). That would mean that you need to use multiple processes, rather than threads, for your program to really compute thing in parallel. It's true there is a GIL in Python, but so long as you're doing your computation using something like numpy, which you are, then it doesn't stop multithreading from working effectively.
  • Trio actually has great support for async file I/O. But I don't think it would be helpful in your case.

Solution 3:[3]

To supplement my other answer (which uses Trio like you asked), here's how to do it use it just using threads without any async library. The easiest way to do this with Future objects and a ThreadPoolExecutor.

futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    for matrix in matGenerator(testiters):
        futures.append(executor.submit(computeOperation, matrix))
results = [f.result() for f in futures]

The code is actually pretty similar to the async code, but if anything it's simpler. If you don't need to do network I/O, you're better off with this method.

Solution 4:[4]

I think the main issue with using multiprocessing and not seeing any improvement is the 100% utilization of the CPU. It essentially leaves you with an async-like behavior where resources are occasionally being freed up and used for the I/O process. You could set a limit to the number of workers for your ProcessPoolExecutor and that might allow the I/O the room it needs to be more at the ready.

Disclaimer: I'm still new to multiprocessing and threading.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Arthur Tacca
Solution 3
Solution 4 Lord Byronary the 0110th