'Python asyncio event loop add wait for server connection
I want to establish a connection with asyncio.open_connection but I can't get it to work.
first I create the loop and a queue
_loop = asyncio.new_event_loop()
_queue = asyncio.Queue(loop=_loop)
And then I want the server to listen.
class Server:
    def __init__(self, host: str, port: int) -> None:
        self.host = host
        self.port = port
        self.reader = None
        self.writer: Optional[asyncio.StreamWriter] = None
        self.reader: Optional[asyncio.StreamReader] = None
        self.connected = asyncio.Event(loop=_loop)
    def close(self) -> None:
        self.writer.close()
    async def start(self):
        self.reader, self.writer = await asyncio.open_connection(self.host, self.port, loop=_loop)
        self.connected.set()
    async def receive_data_async(self):
        await self.connected.wait()
        data = ""
        while True:
            buffer = await self.reader.read(128)
            if not buffer:
                break
            data += buffer.decode()
    async def send_data_async(self, event: Event):
        await self.connected.wait()
        self.writer.write(event.encode())
        await self.writer.drain()
theoretically receive and send should wait until connection is established. I run these methods here:
class Wrapper:
    def __init__(self, PORT):
        self.server: Server = Server(os.getenv("HOST"), PORT)
        self.events: Dict[str, Callable[[str, Event], None]] = dict()
        _queue.put_nowait(self.server.start)
    def register(self, name: str, callback):
        self.events[name] = callback
    def send(self, name: str, data=None):
        event = Event(name, data)
        task = _loop.create_task(self.server.send_data_async(event))
        _queue.put_nowait(task)
        print(f"send event {name} with {data} to C#")
async def worker():
    task = await _queue.get()
    await task
def flush():
    tasks = [_loop.create_task(worker()), _loop.create_task(worker())]
    _loop.run_until_complete(asyncio.gather(
        _queue.join(),
        *tasks
    ))
but when I want to run flush the whole thing crashes and I get a ValueError: loop argument must agree with Future
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source | 
|---|
