'Python: measuring websocket queue length waiting for ws.recv()
I am consuming low latency market data and I'm trying to measure how many streams I can consume without my code slowing down due to the websocket message queue building up. My understanding is that messages are received by the websocket and queued until ws.recv() is called, which processes them one at a time in the order they were received. Under normal circumstances, my code is definitely fast enough to handle the messages, but when a burst of messages comes all at once I would imagine that the queue fills up. I would expect that the queue would only be filled up for 5 or 10 milliseconds, but it is very important that I know this. Is there a way to measure how many messages are waiting in the queue?
I'm attaching a snippet of the code I'm using for context, but the relevant part is just looping over
data = self.ws.recv()
class WebsocketClient(object):
def __init__(
self,
url=""
products=None,
message_type="subscribe",
should_print=True,
self.url = url
self.products = products
self.channels = channels
self.type = message_type
self.stop = True
self.error = None
self.ws = None
self.thread = None
self.auth = auth
self.api_key = api_key
self.api_secret = api_secret
self.api_passphrase = api_passphrase
self.should_print = should_print
def start(self):
def _go():
self._connect()
self._listen()
self._disconnect()
self.stop = False
self.on_open()
self.thread = Thread(target=_go)
self.keepalive = Thread(target=self._keepalive)
self.thread.start()
def _connect(self):
if self.products is None:
self.products = []
elif not isinstance(self.products, list):
self.products = [self.products]
if self.url[-1] == "/":
self.url = self.url[:-1]
if self.channels is None:
self.channels = [{"name": "ticker", "product_ids": [product_id for product_id in self.products]}]
sub_params = {'type': 'subscribe', 'product_ids': self.products, 'channels': self.channels}
else:
sub_params = {'type': 'subscribe', 'product_ids': self.products, 'channels': self.channels}
if self.auth:
#timestamp = int(time.time())
#message = timestamp + 'GET' + '/users/self/verify'
auth_headers = get_auth_headers('/users/self/verify','GET','')
#print(auth_headers)
sub_params['signature'] = auth_headers['CB-ACCESS-SIGN']
sub_params['key'] = auth_headers['CB-ACCESS-KEY']
sub_params['passphrase'] = auth_headers['CB-ACCESS-PASSPHRASE']
sub_params['timestamp'] = auth_headers['CB-ACCESS-TIMESTAMP']
try:
self.ws = create_connection(self.url)
self.ws.send(json.dumps(sub_params))
except:
traceback.print_exc()
self.stop = True
def _keepalive(self, interval=10):
while self.ws.connected:
self.ws.ping("keepalive")
time.sleep(interval)
def _listen(self):
self.keepalive.start()
while not self.stop:
try:
data = self.ws.recv()
msg = json.loads(data)
except ValueError as e:
self.on_error(e)
except Exception as e:
self.on_error(e)
else:
self.on_message(msg)
def _disconnect(self):
try:
if self.ws:
self.ws.close()
except WebSocketConnectionClosedException as e:
pass
finally:
self.keepalive.join()
self.on_close()
def close(self):
self.stop = True # will only disconnect after next msg recv
self._disconnect() # force disconnect so threads can join
self.thread.join()
def on_open(self):
if self.should_print:
print("-- Subscribed! --\n")
def on_close(self):
if self.should_print:
print("\n-- Socket Closed --")
def on_message(self, msg):
*** my logic ***
def on_error(self, e, data=None):
self.error = e
self.stop = True
print('{} - data: {}'.format(e, data))
Solution 1:[1]
You can measure length of incomming messages buffer by calling
len(self.ws.messages)
There is a background asyncio task, that reads StreamReader bytes buffer and put messages to ws.messages deque.
Messages deque is limited by max_queue parameter of client.connect method: https://websockets.readthedocs.io/en/stable/reference/client.html#websockets.client.connect
Here are the details: https://websockets.readthedocs.io/en/stable/topics/design.html#backpressure
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Aleksandr Gavrilov |