'Locust Python: Kafka Consumer in a separate thread

I'm using Locust for load testing. I want to register a Kafka consumer in separate thread to measure the time of message processing. Here is what I got now:

def register_kafka_consumer(topic, environment):
    def poll():
        request_type = 'KAFKA_MESSAGE_CONSUMED'
        consumer = Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id
        })
        consumer.subscribe([topic])
        logging.getLogger().info("Consumer subscribed to topic '%s': %s", topic, consumer)
        try:
            while True:
                msg = consumer.poll()
                if msg is not None and not msg.error():
                    # diff_millis calculation
                    environment.events.request.fire(
                        request_type=request_type,
                        name=topic,
                        response=msg.value(),
                        response_time=diff_millis,
                    )
        except Exception:
            logging.getLogger().error("Error during polling message for consumer: %s",
                                      str(consumer), exc_info=True)
        finally:
            logging.getLogger().info("Kafka consumer closed: %s", str(consumer))

    # here. I'm starting a separate thread
    t = threading.Thread(target=poll)
    t.start()
    logging.getLogger().info("Thread started")


@events.init.add_listener
def on_locust_init(environment, **kwargs):
    register_kafka_consumer(raw_topic, environment)

But according to logs the execution is stuck in while True loop.

[2022-05-11 17:20:53,309] INFO/locust.main: Starting web interface at http://localhost:8089
[2022-05-11 17:20:53,340] INFO/root: Consumer subscribed to topic 'some_topic': <cimpl.Consumer object at 0x10c28c510>

Because there is no Thread started log entry.

What am I doing wrong? Is there any approach to overcome this issue?

EDIT 1

I found this page in Locust documentation with an example of greenlets usage. Here is what I got now.

def register_kafka_consumer(topic, environment):
    def poll():
        request_type = 'KAFKA_MESSAGE_CONSUMED'
        consumer = Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id
        })
        consumer.subscribe([topic])
        logging.getLogger().info("Consumer subscribed to topic '%s': %s", topic, consumer)
        try:
            while environment.runner.state not in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
                time.sleep(1)
                msg = consumer.poll()
                if msg is not None and not msg.error():
                    # diff_millis calculation
                    environment.events.request.fire(
                        request_type=request_type,
                        name=topic,
                        response=msg.value(),
                        response_time=diff_millis,
                    )
        except Exception:
            logging.getLogger().error("Error during polling message for consumer: %s",
                                      str(consumer), exc_info=True)
        finally:
            logging.getLogger().info("Kafka consumer closed: %s", str(consumer))

    if not isinstance(environment.runner, WorkerRunner):
        gevent.spawn(poll)
        logging.getLogger().info("Greenlet started")


@events.init.add_listener
def on_locust_init(environment, **kwargs):
    register_kafka_consumer(raw_topic, environment)

According to logs, the Kafka consumer is registered now.

[2022-05-12 12:39:03,310] INFO/locust.main: Starting web interface at http://localhost:8089
[2022-05-12 12:39:03,323] NFO/root: Greenlet started
[2022-05-12 12:39:03,323] INFO/locust.main: Starting Locust 2.8.2
[2022-05-12 12:39:03,379] INFO/root: Consumer subscribed to topic 'kafka_topic': <cimpl.Consumer object at 0x111ef4880>

Though I cannot open the Web UI now. There is only white screen and infinite loading. Perhaps this grenleet task somehow overoccupied the process. Any ideas how to solve this problem?



Solution 1:[1]

Ok, I got it. The problem was caused by this statement:

msg = consumer.poll()

There is no timeout. So, Python interpreter waits forever and never gives a chance another thread for execution. But if I set the timeout directly, everything works like a charm. Here is the final version.

def register_kafka_consumer(topic, environment):
    def poll():
        request_type = 'KAFKA_MESSAGE_CONSUMED'
        consumer = Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id
        })
        consumer.subscribe([topic])
        logging.getLogger().info("Consumer subscribed to topic '%s': %s", topic, consumer)
        try:
            while environment.runner.state not in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
                time.sleep(1)
                # timeout in seconds
                msg = consumer.poll(timeout=0)
                if msg is not None and not msg.error():
                    # diff_millis calculation
                    environment.events.request.fire(
                        request_type=request_type,
                        name=topic,
                        response=msg.value(),
                        response_time=diff_millis,
                    )
        except Exception:
            logging.getLogger().error("Error during polling message for consumer: %s",
                                      str(consumer), exc_info=True)
        finally:
            logging.getLogger().info("Kafka consumer closed: %s", str(consumer))

    if not isinstance(environment.runner, WorkerRunner):
        gevent.spawn(poll)
        logging.getLogger().info("Greenlet started")


@events.init.add_listener
def on_locust_init(environment, **kwargs):
    register_kafka_consumer(raw_topic, environment)

When consumer.poll() reaches timeout, it just returns None and the loop starts again. Anyway, I still don't understand why Python cannot switch context on poll with no timeout. However, the problem is solved now.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1