'Consume azure event hub messages in PHP
As per the issued solution on Azure/azure-event-hubs-for-kafka/issues/51, I tried to use and consume a message but it came out with no luck. Please have a look at the code and guide on what needs to be done?
I have installed the rdkafka.so on my ubuntu system as per php manual for rdkafka and then used this piece of code :
$conf = new RdKafka\Conf();
$conf->set('group.id', '$Default');
$conf->set('metadata.broker.list', 'NAMESPACE.servicebus.windows.net:9093');
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('sasl.username', '$ConnectionString');
$conf->set('sasl.password', 'Endpoint=sb://NAMESPACE.servicebus.windows.net/;SharedAccessKeyName=syn-wk;SharedAccessKey=7XXX/XXXE=;EntityPath=someStringHere');
$conf->set('ssl.ca.location', '/absolute_path_to_cert');
$conf->set('enable.partition.eof', 'true');
$conf->set('api.version.request', 'false');
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');
$rk = new RdKafka\Consumer($conf);
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'broker');
$topicConf->set('auto.offset.reset', 'earliest');
$topic = $rk->newTopic("test", $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while (true) {
$message = $topic->consume(0, 120*10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;`
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
My connection string already has produced events in it that I can confirm by azure-sdk-for-js, using JS SDK I am able to produce and consume events.
When I try to read the $message
from $topic
it returns as null
PS:
- I am currently using the pricing tier as 'Standard' on the azure event hub and it has Kafka Surface 'enabled'
- I have tried setting
api.version.request
as 'true' as well
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|