Java – How to ensure that messages reach the kafka broker?

How to ensure that messages reach the kafka broker?… here is a solution to the problem.

How to ensure that messages reach the kafka broker?

I have a message generator on my local machine and a broker on the remote host (aws).

After the producer sends the message,
I wait and call the console consumer on the remote host, then
Review excessive logs.
There is no producer value.

The producer refreshes the data after calling the send method.
Everything is configured correctly.

How do I check if the agent has received a message from a producer, and if a producer has received a reply?

Solution

The Send method sends a message to the topic asynchronously
Returns RecordMetadata for Future

java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record).

Asynchronously sends a record to a topic

Flush is called after that,
Check if the Future method has completed by calling isDone.
(For example, Future.isDone() == true).

Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records. The post-condition of flush() is that any previously sent record will have completed (e.g. Future.isDone() == true). A request is considered completed when it is successfully acknowledged according to the acks configuration you have specified or else it results in an error.

RecordMetadata contains offset and partition

public int partition()

The partition the record was sent to

Public long offset ().

the offset of the record, or -1 if {hasOffset()} returns false.

Or you can use Callback’s function that ensures whether a message is sent to a topic

Fully non-blocking usage can make use of the Callback parameter to provide a callback that will be invoked when the request is complete.

There are clear examples in the documentation

 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
 producer.send(myRecord,
           new Callback() {
               public void onCompletion(RecordMetadata metadata, Exception e) {
                   if(e != null) {
                      e.printStackTrace();
                   } else {
                      System.out.println("The offset of the record we just sent is: " + metadata.offset());
                   }
               }
           });

Related Problems and Solutions