'"CANCEL read_loop, Task exception was never retrieved"{‘e’: ‘error’, ‘m’: ‘Queue overflow. Message not filled’}
I've finally got my code working which does the following: first it gets the historical data from currently 297 symbols (only the last 20 data intervals). After it got the data it starts to fetch real time data for all these symbols. The code works good for a few seconds but then I get the error: "CANCEL read_loop, Task exception was never retrieved"{‘e’: ‘error’, ‘m’: ‘Queue overflow. Message not filled’}. I've done some searching on the web but there's not much information about it. Does anyone know how to fix this error?
Code to get the live data:
class GetNewData:
@staticmethod
def refactor_old_result():
historical_list_refactored = []
for i in range(len(results)):
single_key_data = results[i]
single_key_data['high'] = list(single_key_data['high'].values())
single_key_data['low'] = list(single_key_data['low'].values())
single_key_data['close'] = list(single_key_data['close'].values())
single_key_data['interval'] = list(single_key_data['interval'].values())
single_key_data['symbol'] = list(single_key_data['symbol'].values())
single_key_data['time'] = list(single_key_data['time'].values())
single_key_data['volume'] = list(single_key_data['volume'].values())
historical_list_refactored.append(single_key_data)
return historical_list_refactored
@staticmethod
def start_websocket():
twm = ThreadedWebsocketManager(api_key=api_key, api_secret=api_secret)
twm.start()
streams = stream_list
twm.start_multiplex_socket(callback=GetNewData.add_new_data_values, streams=streams)
twm.join()
@staticmethod
def add_new_data_values(msg):
candle = msg['data']['k']
symbol = candle['s']
interval = candle['i']
close_price = candle['c']
highest_price = candle['h']
lowest_price = candle['l']
status = candle['x']
time = candle['t']
volume = candle['v']
daytime = datetime.fromtimestamp(time / 1e3)
GetNewData.refactor_msg(old_data_refactored, symbol, interval, close_price, highest_price, lowest_price, status, daytime, volume)
@staticmethod
def refactor_msg(historical_data_list, symbol, interval, close_price, highest_price, lowest_price, status, daytime, volume):
if status:
for dic in historical_data_list:
if interval == dic['interval'][0] and symbol == dic['symbol'][0]:
# print(interval, symbol, close_price)
dic['high'].append(float(highest_price))
dic['low'].append(float(lowest_price))
dic['close'].append(float(close_price))
dic['interval'].append(interval)
dic['symbol'].append(symbol)
dic['time'].append(daytime)
dic['volume'].append(volume)
print(dic)
Entire code:
import asyncio
import pandas as pd
from binance import AsyncClient, Client, ThreadedWebsocketManager
from datetime import datetime
import time
pd.options.mode.chained_assignment = None # default='warn'
pd.set_option('display.max_columns', None, 'display.max_rows', None)
api_key = '5Z5VtpcArDgm525AC6sUoy8TJer4tlel4Twt1ENf7OIzeLB2qx6oaLICHL9jVKoa'
api_secret = 'e641miivoiguEU3gOKjuYEvVKdFNazehtovZtJKdQXb2NVxwTeo1AQ2TBDArocWU'
client = Client(api_key, api_secret)
results = []
symbols = []
with open('DayTradeSymbols.txt') as f:
for line in f:
symbol = line.strip('\n') + 'BTC'
symbols.append(symbol)
stream_list = []
intervals = ['1m','3m']
for symbol in symbols:
for interval in intervals:
x = lambda symbol: symbol + '@kline_' + interval
stream_list.append(x(symbol.lower()))
print(stream_list)
class GetHistoricalData:
def __init__(self, num_workers: int = 10):
self.num_workers: int = num_workers
self.task_q: asyncio.Queue = asyncio.Queue(maxsize=10)
async def get_symbols(self):
# symbols = ['BTCUSDT', 'ETHUSDT', 'ADAUSDT','BNBUSDT', 'SOLUSDT', 'DOTUSDT','SHIBUSDT', 'DOGEUSDT', 'LTCUSDT', 'XLMUSDT', 'XMRUSDT', 'AVAXUSDT', 'LINKUSDT']
for i in symbols:
await self.task_q.put(i)
for i in range(self.num_workers):
await self.task_q.put(None)
async def get_historical_klines(self, client: AsyncClient):
while True:
symbol = await self.task_q.get()
if symbol is None:
break
data = await (client.get_historical_klines(symbol, '1m', '11/23/2021'))
data_df = pd.DataFrame(data)
data_df.columns = ['time', '1', 'high', 'low', 'close', 'volume', '6', '7', '8', '9', '10', '11']
data_df.drop(columns=['1', '6', '7', '8', '9', '10', '11'], axis=1, inplace=True)
data2_df = data_df.head(-1)
new_df = data2_df.tail(22)
new_df['interval'] = '1m'
new_df['symbol'] = symbol
new_df['time'] = pd.to_datetime(new_df['time'] / 1000, unit='s')
results.append(new_df.to_dict())
async def amain(self) -> None:
client = await AsyncClient.create()
await asyncio.gather(
self.get_symbols(),
*[self.get_historical_klines(client) for _ in range(self.num_workers)])
await client.close_connection()
class GetNewData:
@staticmethod
def refactor_old_result():
historical_list_refactored = []
for i in range(len(results)):
single_key_data = results[i]
single_key_data['high'] = list(single_key_data['high'].values())
single_key_data['low'] = list(single_key_data['low'].values())
single_key_data['close'] = list(single_key_data['close'].values())
single_key_data['interval'] = list(single_key_data['interval'].values())
single_key_data['symbol'] = list(single_key_data['symbol'].values())
single_key_data['time'] = list(single_key_data['time'].values())
single_key_data['volume'] = list(single_key_data['volume'].values())
historical_list_refactored.append(single_key_data)
return historical_list_refactored
@staticmethod
def start_websocket():
twm = ThreadedWebsocketManager(api_key=api_key, api_secret=api_secret)
twm.start()
streams = stream_list
twm.start_multiplex_socket(callback=GetNewData.add_new_data_values, streams=streams)
twm.join()
@staticmethod
def add_new_data_values(msg):
candle = msg['data']['k']
symbol = candle['s']
interval = candle['i']
close_price = candle['c']
highest_price = candle['h']
lowest_price = candle['l']
status = candle['x']
time = candle['t']
volume = candle['v']
daytime = datetime.fromtimestamp(time / 1e3)
GetNewData.refactor_msg(old_data_refactored, symbol, interval, close_price, highest_price, lowest_price, status, daytime, volume)
@staticmethod
def refactor_msg(historical_data_list, symbol, interval, close_price, highest_price, lowest_price, status, daytime, volume):
if status:
for dic in historical_data_list:
if interval == dic['interval'][0] and symbol == dic['symbol'][0]:
# print(interval, symbol, close_price)
dic['high'].append(float(highest_price))
dic['low'].append(float(lowest_price))
dic['close'].append(float(close_price))
dic['interval'].append(interval)
dic['symbol'].append(symbol)
dic['time'].append(daytime)
dic['volume'].append(volume)
print(dic)
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(GetHistoricalData().amain())
print(f'time = {time.time() - start_time}')
old_data_refactored = GetNewData.refactor_old_result()
GetNewData.start_websocket()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|