'ThreadPoolExecutor communication with separate asyncio loop

I have a task that is IO bound running in a loop. This task does a lot of work and is often times hogging the loop (Is that the right word for it?). My plan is to run it in a separate process or thread using run_in_executor with ProcessPoolExecutor or ThreadPoolExecutor to run it separately and allow the main loop to do its work. Currently for communication between tasks I use asyncio.PriorityQueue() and asyncio.Event() for communication and would like to reuse these, or something with the same interface, if possible.

Current code:

# Getter for events and queues so communication can happen
send, receive, send_event, receive_event = await process_obj.get_queues()

# Creates task based off the process object
future = asyncio.create_task(process_obj.main())

Current process code:

async def main():

    while True:
        #does things that hogs loop

What I want to do:

# Getter for events and queues so communication can happen
send, receive, send_event, receive_event = await process_obj.get_queues()

# I assume I could use Thread or Process executors
pool = concurrent.futures.ThreadPoolExecutor()
result = await loop.run_in_executor(pool, process_obj.run())

New process code:

def run():
    asyncio.create_task(main())

async def main():

    while True:
        #does things that hogs loop

How do I communicate between this new thread and the original loop like I could originally?



Solution 1:[1]

There is not much I could reproduce your code. So please consider this code from YouTube Downloader as example and I hope that will help you to understand how to get result from thread function:

example code:

def on_download(self, is_mp3: bool, is_mp4: bool, url: str) -> None:
    if is_mp3 == False and is_mp4 == False:
        self.ids.info_lbl.text = 'Please select a type of file to download.'
    else:
        self.ids.info_lbl.text = 'Downloading...'
        
        self.is_mp3 = is_mp3
        self.is_mp4 = is_mp4
        self.url = url
        
        Clock.schedule_once(self.schedule_download, 2)
        Clock.schedule_interval(self.start_progress_bar, 0.1)
        
def schedule_download(self, dt: float) -> None:
    '''
    Callback method for the download.
    '''
    
    pool = ThreadPool(processes=1)
    _downloader = Downloader(self.d_path)
    self.async_result = pool.apply_async(_downloader.download,
                                         (self.is_mp3, self.is_mp4, self.url))
    Clock.schedule_interval(self.check_process, 0.1)
    
def check_process(self, dt: float) -> None:
    '''
    Check if download is complete.
    '''
    if self.async_result.ready():
        resp = self.async_result.get()

        if resp[0] == 'Error. Download failed.':
            self.ids.info_lbl.text = resp[0]
            # progress bar gray if error
            self.stop_progress_bar(value=0)
        else:
            # progress bar blue if success
            self.stop_progress_bar(value=100)
            self.ids.file_name.text = resp[0]
            self.ids.info_lbl.text = 'Finished downloading.'
            self.ids.url_input.text = ''
        
        Clock.unschedule(self.check_process)

Personally I prefer from multiprocessing.pool import ThreadPool and now it looks like your code 'hogs up' because you are awaiting for result. So obviously until there is result program will wait (and that may be long). If you look in my example code:

on_download will schedule and event schedule download and this one will schedule another event check process. I can't tell if you app is GUI app or terminal as there is pretty much no code in your question but what you have to do, in your loop you have to schedule an event of check process. If you look on my check process: if self.async_result.ready(): that will only return when my result is ready. Now you are waiting for the result, here everything is happening in the background and every now and then the main loop will check for the result (it won't hog up as if there is no result the main loop will carry on doing what it have to rather than wait for it).

So basically you have to schedule some events (especially the one for the result) in your loop rather than going line by line and waiting for one. Does that make sense and does my example code is helpful? Sorry I am really bad at explaining what is in my head ;)

-> mainloop
  -> new Thread if there is any
  -> check for result if there is any Threads
    -> if there is a result
      -> do something
  -> mainloop keeps running
  -> back to top

Solution 2:[2]

When you execute the while True in your main coroutine, it doesn't hog the loop but blocks the loop not accepting the rest task to do their jobs. Running a process in your event-based application is not the best solution as the processes are not much friendly in data sharing.

It is possible to do all concurrently without using parallelism. All you need is to execute a await asyncio.sleep(0) at the end of while True. It yields back to the loop and allows the rest tasks to be executed. So we do not exit from the coroutine.

In the following example, I have a listener that uses while True and handles the data added by emitter to the queue.

import asyncio
from queue import Empty
from queue import Queue
from random import choice

queue = Queue()


async def listener():
    while True:
        try:
            # data polling from the queue
            data = queue.get_nowait()
            print(data)  # {"type": "event", "data": {...}}
        except (Empty, Exception):
            pass
        finally:
            # the magic action
            await asyncio.sleep(0)


async def emitter():
    # add a data to the queue
    queue.put({"type": "event", "data": {...}})


async def main():
    # first create a task for listener
    running_loop = asyncio.get_running_loop()
    running_loop.create_task(listener())
    for _ in range(5):
        # create tasks for emitter with random intervals to
        # demonstrate that the listener is still running in
        # the loop and handling the data put into the queue
        running_loop.create_task(emitter())
        await asyncio.sleep(choice(range(2)))


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Artyom Vancyan