'How to send messages synchronously in kafka?

One way of achieving it could be by setting the properties parameter
max.in.flight.requests.per.connection = 1.

But I want to know if there is an even direct or alternate way of sending messages synchronously in kafka, something like producer.syncSend(...).



Solution 1:[1]

The producer API returns a Future from send. You can call Future#get to block until the sending has completed.

See this example from the Javadocs:

If you want to simulate a simple blocking call you can call the get() method immediately:

 byte[] key = "key".getBytes();
 byte[] value = "value".getBytes();
 ProducerRecord<byte[],byte[]> record = 
     new ProducerRecord<byte[],byte[]>("my-topic", key, value)
 producer.send(record).get();

Solution 2:[2]

As Thilo suggested, you can call Future#get to block until the sending has completed. However you might have some performance issue, since the producer starts sending when the producer queue has batch.size elements, when the buffer of size buffer.memory is full or after max.block.ms milliseconds.

If you have a limited number of threads pushing to kafka, you will have to wait max.block.ms each time for your message to be sent. So in some cases, you will prefer using :

// send message to producer queue
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, key, message));
// flush producer queue to spare queuing time
producer.flush();
// throw error when kafka is unreachable
future.get(10, TimeUnit.SECONDS);

Solution 3:[3]

The Thilo proposed answer is the way to go. In general, your suggestion about using max.in.flight.requests.per.connection = 1 is used for having still retries enabled but without losing messages ordering. It's not so used for having a sync producer.

Solution 4:[4]

From my adventures with Kafka :-) order of message production can only be guaranteed if you have one Producer thread and set max.in.flight.requests.per.connection = 1 (or turn of retries, i.e. retries= 0 or both).

If you what to scale to more than one Producer, then you have to "make sure" that messages that will be stored to the same partition will be produced by the same Producer instance.

Solution 5:[5]

When max.in.flight.requests.per.connection = 1, it just means the ordering of messages is guaranteed within a partition it has nothing to do with synchronization.

Python code in-case. For a synchronous send, make sure to block on the future with a good time-out.

from kafka import KafkaProducer
from kafka.errors import KafkaError

#by default ack = 1, if ack = 'all' --> waits for acks from replicas 
producer = KafkaProducer(bootstrap_servers=['brokerIP:9092'], ack= 'all')


key = b'key'
value = b'value'

future = producer.send("my-topic", key=key, value=value)

# block on this future for sync sends
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    log.exception()
    pass

print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

producer.flush()
producer.close()

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Thilo
Solution 2 Camille Vienot
Solution 3 ppatierno
Solution 4 Vassilis
Solution 5