'Python: measuring websocket queue length waiting for ws.recv()

I am consuming low latency market data and I'm trying to measure how many streams I can consume without my code slowing down due to the websocket message queue building up. My understanding is that messages are received by the websocket and queued until ws.recv() is called, which processes them one at a time in the order they were received. Under normal circumstances, my code is definitely fast enough to handle the messages, but when a burst of messages comes all at once I would imagine that the queue fills up. I would expect that the queue would only be filled up for 5 or 10 milliseconds, but it is very important that I know this. Is there a way to measure how many messages are waiting in the queue?

I'm attaching a snippet of the code I'm using for context, but the relevant part is just looping over data = self.ws.recv()

class WebsocketClient(object):
    def __init__(
            self,
            url=""
            products=None,
            message_type="subscribe",
            should_print=True,

        self.url = url
        self.products = products
        self.channels = channels
        self.type = message_type
        self.stop = True
        self.error = None
        self.ws = None
        self.thread = None
        self.auth = auth
        self.api_key = api_key
        self.api_secret = api_secret
        self.api_passphrase = api_passphrase
        self.should_print = should_print
        

    def start(self):
        def _go():
            self._connect()
            self._listen()
            self._disconnect()

        self.stop = False
        self.on_open()
        self.thread = Thread(target=_go)
        self.keepalive = Thread(target=self._keepalive)
        self.thread.start()

    def _connect(self):
        if self.products is None:
            self.products = []
        elif not isinstance(self.products, list):
            self.products = [self.products]

        if self.url[-1] == "/":
            self.url = self.url[:-1]

        if self.channels is None:
            self.channels = [{"name": "ticker", "product_ids": [product_id for product_id in self.products]}]
            sub_params = {'type': 'subscribe', 'product_ids': self.products, 'channels': self.channels}
        else:
            sub_params = {'type': 'subscribe', 'product_ids': self.products, 'channels': self.channels}

        if self.auth:
            #timestamp = int(time.time())
            #message = timestamp + 'GET' + '/users/self/verify'
            auth_headers = get_auth_headers('/users/self/verify','GET','')
            #print(auth_headers)
            sub_params['signature'] = auth_headers['CB-ACCESS-SIGN']
            sub_params['key'] = auth_headers['CB-ACCESS-KEY']
            sub_params['passphrase'] = auth_headers['CB-ACCESS-PASSPHRASE']
            sub_params['timestamp'] = auth_headers['CB-ACCESS-TIMESTAMP']

        try:
            self.ws = create_connection(self.url)
            self.ws.send(json.dumps(sub_params))
            
        except:
            traceback.print_exc()
            self.stop = True

    def _keepalive(self, interval=10):
        while self.ws.connected:
            self.ws.ping("keepalive")
            time.sleep(interval)

    def _listen(self):
        self.keepalive.start()
        while not self.stop:
            try:
                data = self.ws.recv()
                msg = json.loads(data)
            except ValueError as e:
                self.on_error(e)
            except Exception as e:
                self.on_error(e)
            else:
                self.on_message(msg)

    def _disconnect(self):
        try:
            if self.ws:
                self.ws.close()
        except WebSocketConnectionClosedException as e:
            pass
        finally:
            self.keepalive.join()

        self.on_close()

    def close(self):
        self.stop = True   # will only disconnect after next msg recv
        self._disconnect() # force disconnect so threads can join
        self.thread.join()

    def on_open(self):
        if self.should_print:
            print("-- Subscribed! --\n")

    def on_close(self):
        if self.should_print:
            print("\n-- Socket Closed --")

    def on_message(self, msg):
        *** my logic ***
        
    def on_error(self, e, data=None):
        self.error = e
        self.stop = True
        print('{} - data: {}'.format(e, data))


Solution 1:[1]

You can measure length of incomming messages buffer by calling

len(self.ws.messages)

There is a background asyncio task, that reads StreamReader bytes buffer and put messages to ws.messages deque.

Messages deque is limited by max_queue parameter of client.connect method: https://websockets.readthedocs.io/en/stable/reference/client.html#websockets.client.connect

Here are the details: https://websockets.readthedocs.io/en/stable/topics/design.html#backpressure

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Aleksandr Gavrilov