'How to read lines from file asynchronously and work with them in gather?

I'm trying to replace list of generated numbers with asynchronously reading them from file. How can I do it right?

from asyncio import get_event_loop, gather, sleep
import aiofiles


async def read_file():
    async with aiofiles.open('test.txt', mode='rb') as f:
        async for line in f:
            yield line


async def main(k):
    print(k)
    await sleep(1)


if __name__ == '__main__':
    count_group = 3
    list_objects = list()

    for i in range(1, 11):
        list_objects.append(i)

    loop = get_event_loop()

    # TODO How to correctly replace list_objects with read_file()
    list_func = [main(x) for x in list_objects]

    run_groups = [list_func[i:i + count_group] for i in range(0, len(list_func), count_group)]
    for rg in run_groups:
        loop.run_until_complete(gather(*rg))

I tried different options but none of them work. My goal is to asynchronously read lines from file and work with them.



Solution 1:[1]

As VPfB pointed

  • Using asyncio not seems usable in the case since jumping to the next line would be in instance so there won't be enough time for other coroutines to get started.
  • Other implementation we can do is read file over different places using coroutines/task but since we don't know how many lines are there in file until we read it completely we cannot run multiple task reading files over different points in file.
import time

from asyncio import get_event_loop, gather, sleep
import aiofiles


# async generator
async def read_file():
    async with aiofiles.open('test.txt', mode='rb') as f:
        async for line in f:
            yield line

# async coroutines
async read(g):
    for line in g:
        print(line)

# async def main(k):
#    print(k)
#    await sleep(1)


if __name__ == '__main__':
    # count_group = 3
    # list_objects = list()

    # for i in range(1, 11):
    #    list_objects.append(i)

    loop = get_event_loop()

    # TODO How to correctly replace list_objects with read_file()
    # list_func = [main(x) for x in list_objects]

    # run_groups = [list_func[i:i + count_group] for i in range(0, len(list_func), count_group)]
    # for rg in run_groups:
    #    loop.run_until_complete(gather(*rg))
    start = time.perf_counter()
    loop.run_until_complete(read(read_file()))
    print(time.perf_counter() - start) # 0.002 approx
start = time.perf_counter()
with open('test.txt') as f:
   line = f.readline()
   while line:
       print(line)
       line = f.readline()
print(time.perf_counter() - start) # 0.0002 approx

Note: Include code performance for reference

Solution 2:[2]

You could do the following:

from asyncio import get_event_loop, gather, sleep
import aiofiles

@asyncio.coroutine
def read_line(f):
    while True:
       data = f.readline()
       if data:
           return data
       yield from asyncio.sleep(1)

@asyncio.coroutine
def read_file():
    with open('test.txt', mode  = "r") as f:
        while True:
            line = yield from read_line(f)
            print('Got: {!r}'.format(line))


if __name__ == '__main__':
    count_group = 3
    list_objects = list()

    for i in range(1, 11):
        list_objects.append(i)

    loop = get_event_loop()

    list_func = [read_file(x) for x in list_objects]

    run_groups = [list_func[i:i + count_group] for i in range(0, len(list_func), count_group)]
    for rg in run_groups:
        loop.run_until_complete(gather(*rg))

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Chandan
Solution 2 Maxime Bonnesoeur