'FastAPI websockets not working when using Redis pubsub functionality

currently I'm using websockets to pass through data that I receive from a Redis queue (pub/sub). But for some reason the websocket doesn't send messages when using this redis queue.

What my code looks like

My code works as folllow:

  1. I accept the socket connection
  2. I connect to the redis queue
  3. For each message that I receive from the subscription, i sent a message through the socket. (at the moment only text for testing)
@check_route.websocket_route("/check")
async def websocket_endpoint(websocket: WebSocket):

    await websocket.accept()

    redis = Redis(host='::1', port=6379, db=1)
    subscribe = redis.pubsub()
    subscribe.subscribe('websocket_queue')

    try:
        for result in subscribe.listen():
            await websocket.send_text('test')
            print('test send')
    except Exception as e:
        await websocket.close()
        raise e

The issue with the code

When I'm using this code it's just not sending the message through the socket. But when I accept the websocket within the subscribe.listen() loop it does work but it reconnects every time (see code below).

@check_route.websocket_route("/check")
async def websocket_endpoint(websocket: WebSocket):

    redis = Redis(host='::1', port=6379, db=1)
    subscribe = redis.pubsub()
    subscribe.subscribe('websocket_queue')

    try:
        for result in subscribe.listen():
            await websocket.accept()
            await websocket.send_text('test')
            print('test send')
    except Exception as e:
        await websocket.close()
        raise e

I think that the subscribe.listen() causes some problems that make the websocket do nothing when websocket.accept() is outside the for loop.

I hope someone knows whats wrong with this.



Solution 1:[1]

After a few days more research I found a solution for this issue. I solved it by using aioredis. This solution is based on the following GitHub Gist.

import json
import aioredis

from fastapi import APIRouter, WebSocket

from app.service.config_service import load_config

check_route = APIRouter()


@check_route.websocket("/check")
async def websocket_endpoint(websocket: WebSocket):

    await websocket.accept()

    # ---------------------------- REDIS REQUIREMENTS ---------------------------- #
    config = load_config()
    redis_uri: str = f"redis://{config.redis.host}:{config.redis.port}"
    redis_channel = config.redis.redis_socket_queue.channel
    redis = await aioredis.create_redis_pool(redis_uri)

    # ------------------ SEND SUBSCRIBE RESULT THROUGH WEBSOCKET ----------------- #
    (channel,) = await redis.subscribe(redis_channel)
    assert isinstance(channel, aioredis.Channel)
    try:
        while True:
            response_raw = await channel.get()
            response_str = response_raw.decode("utf-8")
            response = json.loads(response_str)

            if response:
                await websocket.send_json({
                    "event": 'NEW_CHECK_RESULT',
                    "data": response
                })
    except Exception as e:
        raise e

Solution 2:[2]

I'm not sure if this will work, but you could try this:

async def websocket_endpoint(websocket: WebSocket):

    await websocket.accept()

    redis = Redis(host='::1', port=6379, db=1)
    subscribe = redis.pubsub()
    subscribe.subscribe('websocket_queue')

    try:
        results = await subscribe.listen()
        for result in results:
            await websocket.send_text('test')
            print('test send')
    except Exception as e:
        await websocket.close()
        raise e

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 michael holstein
Solution 2 Abhinav Mathur