'How stream a response from a Twisted server?

Issue

My problem is that I can't write a server that streams the response that my application sends back.
The response are not retrieved chunk by chunk, but from a single block when the iterator has finished iterating.

Approach

When I write the response with the write method of Request, it understands well that it is a chunk that we send.
I checked if there was a buffer size used by Twisted, but the message size check seems to be done in the doWrite.

After spending some time debugging, it seems that the reactor only reads and writes at the end.
If I understood correctly how a reactor works with Twisted, it writes and reads when the file descriptor is available.

What is a file descriptor in Twisted ?
Why is it not available after writing the response ?

Example

I have written a minimal script of what I would like my server to look like.
It's a "ASGI-like" server that runs an application, iterates over a function that returns a very large string:

# async_stream_server.py
import asyncio
from twisted.internet import asyncioreactor

twisted_loop = asyncio.new_event_loop()
asyncioreactor.install(twisted_loop)

import time
from sys import stdout

from twisted.web import http
from twisted.python.log import startLogging
from twisted.internet import reactor, endpoints

CHUNK_SIZE = 2**16


def async_partial(async_fn, *partial_args):
    async def wrapped(*args):
        return await async_fn(*partial_args, *args)
    return wrapped


def iterable_content():
    for _ in range(5):
        time.sleep(1)
        yield b"a" * CHUNK_SIZE


async def application(send):
    for part in iterable_content():
        await send(
            {
                "body": part,
                "more_body": True,
            }
        )
    await send({"more_body": False})


class Dummy(http.Request):
    def process(self):
        asyncio.ensure_future(
            application(send=async_partial(self.handle_reply)),
            loop=asyncio.get_event_loop()
        )

    async def handle_reply(self, message):
        http.Request.write(self, message.get("body", b""))
        if not message.get("more_body", False):
            http.Request.finish(self)
        print('HTTP response chunk')


class DummyFactory(http.HTTPFactory):
    def buildProtocol(self, addr):
        protocol = http.HTTPFactory.buildProtocol(self, addr)
        protocol.requestFactory = Dummy
        return protocol


startLogging(stdout)
endpoints.serverFromString(reactor, "tcp:1234").listen(DummyFactory())
asyncio.set_event_loop(reactor._asyncioEventloop)
reactor.run()

To execute this example:

  • in a terminal, run:
python async_stream_server.py
  • in another terminal, run:
curl http://localhost:1234/

You will have to wait a while before you see the whole message.

Details

$ python --version
Python 3.10.4
$ pip list
Package           Version Editable project location
----------------- ------- --------------------------------------------------
asgiref           3.5.0
Twisted           22.4.0


Solution 1:[1]

You just need to sprinkle some more async over it.

As written, the iterable_content generator blocks the reactor until it finishes generating content. This is why you see no results until it is done. The reactor does not get control of execution back until it finishes.

That's only because you used time.sleep to insert a delay into it. time.sleep blocks. This -- and everything else in the "asynchronous" application -- is really synchronous and keeps control of execution until it is done.

If you replace iterable_content with something that's really asynchronous, like an asynchronous generator:

async def iterable_content():
    for _ in range(5):
        await asyncio.sleep(1)
        yield b"a" * CHUNK_SIZE

and then iterate over it asynchronously with async for:

async def application(send):
    async for part in iterable_content():
        await send(
            {
                "body": part,
                "more_body": True,
            }
        )
    await send({"more_body": False})

then the reactor has a chance to run in between iterations and the server begins to produce output chunk by chunk.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Jean-Paul Calderone