'Python asyncio event loop add wait for server connection

I want to establish a connection with asyncio.open_connection but I can't get it to work.

first I create the loop and a queue

_loop = asyncio.new_event_loop()
_queue = asyncio.Queue(loop=_loop)

And then I want the server to listen.

class Server:

    def __init__(self, host: str, port: int) -> None:
        self.host = host
        self.port = port

        self.reader = None
        self.writer: Optional[asyncio.StreamWriter] = None
        self.reader: Optional[asyncio.StreamReader] = None

        self.connected = asyncio.Event(loop=_loop)

    def close(self) -> None:
        self.writer.close()

    async def start(self):
        self.reader, self.writer = await asyncio.open_connection(self.host, self.port, loop=_loop)
        self.connected.set()

    async def receive_data_async(self):
        await self.connected.wait()
        data = ""
        while True:
            buffer = await self.reader.read(128)
            if not buffer:
                break
            data += buffer.decode()

    async def send_data_async(self, event: Event):
        await self.connected.wait()
        self.writer.write(event.encode())
        await self.writer.drain()

theoretically receive and send should wait until connection is established. I run these methods here:

class Wrapper:

    def __init__(self, PORT):
        self.server: Server = Server(os.getenv("HOST"), PORT)
        self.events: Dict[str, Callable[[str, Event], None]] = dict()

        _queue.put_nowait(self.server.start)

    def register(self, name: str, callback):
        self.events[name] = callback

    def send(self, name: str, data=None):
        event = Event(name, data)
        task = _loop.create_task(self.server.send_data_async(event))
        _queue.put_nowait(task)
        print(f"send event {name} with {data} to C#")


async def worker():
    task = await _queue.get()
    await task


def flush():
    tasks = [_loop.create_task(worker()), _loop.create_task(worker())]
    _loop.run_until_complete(asyncio.gather(
        _queue.join(),
        *tasks
    ))

but when I want to run flush the whole thing crashes and I get a ValueError: loop argument must agree with Future



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source