'How stream a response from a Twisted server?
Issue
My problem is that I can't write a server that streams the response that my application sends back.
The response are not retrieved chunk by chunk, but from a single block when the iterator has finished iterating.
Approach
When I write the response with the write
method of Request
, it understands well that it is a chunk that we send.
I checked if there was a buffer size used by Twisted, but the message size check seems to be done in the doWrite
.
After spending some time debugging, it seems that the reactor only reads and writes at the end.
If I understood correctly how a reactor works with Twisted, it writes and reads when the file descriptor is available.
What is a file descriptor in Twisted ?
Why is it not available after writing the response ?
Example
I have written a minimal script of what I would like my server to look like.
It's a "ASGI-like" server that runs an application, iterates over a function that returns a very large string:
# async_stream_server.py
import asyncio
from twisted.internet import asyncioreactor
twisted_loop = asyncio.new_event_loop()
asyncioreactor.install(twisted_loop)
import time
from sys import stdout
from twisted.web import http
from twisted.python.log import startLogging
from twisted.internet import reactor, endpoints
CHUNK_SIZE = 2**16
def async_partial(async_fn, *partial_args):
async def wrapped(*args):
return await async_fn(*partial_args, *args)
return wrapped
def iterable_content():
for _ in range(5):
time.sleep(1)
yield b"a" * CHUNK_SIZE
async def application(send):
for part in iterable_content():
await send(
{
"body": part,
"more_body": True,
}
)
await send({"more_body": False})
class Dummy(http.Request):
def process(self):
asyncio.ensure_future(
application(send=async_partial(self.handle_reply)),
loop=asyncio.get_event_loop()
)
async def handle_reply(self, message):
http.Request.write(self, message.get("body", b""))
if not message.get("more_body", False):
http.Request.finish(self)
print('HTTP response chunk')
class DummyFactory(http.HTTPFactory):
def buildProtocol(self, addr):
protocol = http.HTTPFactory.buildProtocol(self, addr)
protocol.requestFactory = Dummy
return protocol
startLogging(stdout)
endpoints.serverFromString(reactor, "tcp:1234").listen(DummyFactory())
asyncio.set_event_loop(reactor._asyncioEventloop)
reactor.run()
To execute this example:
- in a terminal, run:
python async_stream_server.py
- in another terminal, run:
curl http://localhost:1234/
You will have to wait a while before you see the whole message.
Details
$ python --version
Python 3.10.4
$ pip list
Package Version Editable project location
----------------- ------- --------------------------------------------------
asgiref 3.5.0
Twisted 22.4.0
Solution 1:[1]
You just need to sprinkle some more async
over it.
As written, the iterable_content
generator blocks the reactor until it finishes generating content. This is why you see no results until it is done. The reactor does not get control of execution back until it finishes.
That's only because you used time.sleep
to insert a delay into it. time.sleep
blocks. This -- and everything else in the "asynchronous" application -- is really synchronous and keeps control of execution until it is done.
If you replace iterable_content
with something that's really asynchronous, like an asynchronous generator:
async def iterable_content():
for _ in range(5):
await asyncio.sleep(1)
yield b"a" * CHUNK_SIZE
and then iterate over it asynchronously with async for
:
async def application(send):
async for part in iterable_content():
await send(
{
"body": part,
"more_body": True,
}
)
await send({"more_body": False})
then the reactor has a chance to run in between iterations and the server begins to produce output chunk by chunk.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Jean-Paul Calderone |