'FastAPI websockets not working when using Redis pubsub functionality
currently I'm using websockets to pass through data that I receive from a Redis queue (pub/sub). But for some reason the websocket doesn't send messages when using this redis queue.
What my code looks like
My code works as folllow:
- I accept the socket connection
- I connect to the redis queue
- For each message that I receive from the subscription, i sent a message through the socket. (at the moment only text for testing)
@check_route.websocket_route("/check")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
redis = Redis(host='::1', port=6379, db=1)
subscribe = redis.pubsub()
subscribe.subscribe('websocket_queue')
try:
for result in subscribe.listen():
await websocket.send_text('test')
print('test send')
except Exception as e:
await websocket.close()
raise e
The issue with the code
When I'm using this code it's just not sending the message through the socket. But when I accept the websocket within the subscribe.listen()
loop it does work but it reconnects every time (see code below).
@check_route.websocket_route("/check")
async def websocket_endpoint(websocket: WebSocket):
redis = Redis(host='::1', port=6379, db=1)
subscribe = redis.pubsub()
subscribe.subscribe('websocket_queue')
try:
for result in subscribe.listen():
await websocket.accept()
await websocket.send_text('test')
print('test send')
except Exception as e:
await websocket.close()
raise e
I think that the subscribe.listen()
causes some problems that make the websocket do nothing when websocket.accept()
is outside the for loop.
I hope someone knows whats wrong with this.
Solution 1:[1]
After a few days more research I found a solution for this issue. I solved it by using aioredis. This solution is based on the following GitHub Gist.
import json
import aioredis
from fastapi import APIRouter, WebSocket
from app.service.config_service import load_config
check_route = APIRouter()
@check_route.websocket("/check")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# ---------------------------- REDIS REQUIREMENTS ---------------------------- #
config = load_config()
redis_uri: str = f"redis://{config.redis.host}:{config.redis.port}"
redis_channel = config.redis.redis_socket_queue.channel
redis = await aioredis.create_redis_pool(redis_uri)
# ------------------ SEND SUBSCRIBE RESULT THROUGH WEBSOCKET ----------------- #
(channel,) = await redis.subscribe(redis_channel)
assert isinstance(channel, aioredis.Channel)
try:
while True:
response_raw = await channel.get()
response_str = response_raw.decode("utf-8")
response = json.loads(response_str)
if response:
await websocket.send_json({
"event": 'NEW_CHECK_RESULT',
"data": response
})
except Exception as e:
raise e
Solution 2:[2]
I'm not sure if this will work, but you could try this:
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
redis = Redis(host='::1', port=6379, db=1)
subscribe = redis.pubsub()
subscribe.subscribe('websocket_queue')
try:
results = await subscribe.listen()
for result in results:
await websocket.send_text('test')
print('test send')
except Exception as e:
await websocket.close()
raise e
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | michael holstein |
Solution 2 | Abhinav Mathur |