'Consume azure event hub messages in PHP

As per the issued solution on Azure/azure-event-hubs-for-kafka/issues/51, I tried to use and consume a message but it came out with no luck. Please have a look at the code and guide on what needs to be done?

I have installed the rdkafka.so on my ubuntu system as per php manual for rdkafka and then used this piece of code :

$conf = new RdKafka\Conf();
$conf->set('group.id', '$Default');
$conf->set('metadata.broker.list', 'NAMESPACE.servicebus.windows.net:9093');
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('sasl.username', '$ConnectionString');
$conf->set('sasl.password', 'Endpoint=sb://NAMESPACE.servicebus.windows.net/;SharedAccessKeyName=syn-wk;SharedAccessKey=7XXX/XXXE=;EntityPath=someStringHere');
$conf->set('ssl.ca.location', '/absolute_path_to_cert');
$conf->set('enable.partition.eof', 'true');
$conf->set('api.version.request', 'false');
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');

$rk = new RdKafka\Consumer($conf);
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'broker');
$topicConf->set('auto.offset.reset', 'earliest');
$topic = $rk->newTopic("test", $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume(0, 120*10000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;`
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

My connection string already has produced events in it that I can confirm by azure-sdk-for-js, using JS SDK I am able to produce and consume events. When I try to read the $message from $topic it returns as null

PS:

  1. I am currently using the pricing tier as 'Standard' on the azure event hub and it has Kafka Surface 'enabled'
  2. I have tried setting api.version.request as 'true' as well


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source