This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 4527e54647a KAFKA-14753: Improve kafka producer example (#13354) 4527e54647a is described below commit 4527e54647a0b8457f7f2b5d804eb65dc4d9d817 Author: Philip Nee <p...@confluent.io> AuthorDate: Tue Mar 7 16:25:49 2023 -0800 KAFKA-14753: Improve kafka producer example (#13354) Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../src/main/java/kafka/examples/Producer.java | 65 +++++++++++++++------- 1 file changed, 44 insertions(+), 21 deletions(-) diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index e649a7862c9..e85fa16060e 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -27,7 +27,13 @@ import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +/** + * Demo producer that demonstrate two modes of KafkaProducer. + * If the user uses the Async mode: The messages will be printed to stdout upon successful completion + * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + */ public class Producer extends Thread { private final KafkaProducer<Integer, String> producer; private final String topic; @@ -54,8 +60,8 @@ public class Producer extends Thread { props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); } props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); - producer = new KafkaProducer<>(props); + this.topic = topic; this.isAsync = isAsync; this.numRecords = numRecords; @@ -70,28 +76,45 @@ public class Producer extends Thread { public void run() { int messageKey = 0; int recordsSent = 0; - while (recordsSent < numRecords) { - String messageStr = "Message_" + messageKey; - long startTime = System.currentTimeMillis(); - if (isAsync) { // Send asynchronously - producer.send(new ProducerRecord<>(topic, - messageKey, - messageStr), new DemoCallBack(startTime, messageKey, messageStr)); - } else { // Send synchronously - try { - producer.send(new ProducerRecord<>(topic, - messageKey, - messageStr)).get(); - System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")"); - } catch (InterruptedException | ExecutionException e) { - e.printStackTrace(); - } + try { + while (recordsSent < numRecords) { + final long currentTimeMs = System.currentTimeMillis(); + produceOnce(messageKey, recordsSent, currentTimeMs); + messageKey += 2; + recordsSent += 1; } - messageKey += 2; - recordsSent += 1; + } catch (Exception e) { + System.out.println("Producer encountered exception:" + e); + } finally { + System.out.println("Producer sent " + numRecords + " records successfully"); + this.producer.close(); + latch.countDown(); } - System.out.println("Producer sent " + numRecords + " records successfully"); - latch.countDown(); + } + + private void produceOnce(final int messageKey, final int recordsSent, final long currentTimeMs) throws ExecutionException, InterruptedException { + String messageStr = "Message_" + messageKey; + + if (isAsync) { // Send asynchronously + sendAsync(messageKey, messageStr, currentTimeMs); + return; + } + Future<RecordMetadata> future = send(messageKey, messageStr); + future.get(); + System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")"); + } + + private void sendAsync(final int messageKey, final String messageStr, final long currentTimeMs) { + this.producer.send(new ProducerRecord<>(topic, + messageKey, + messageStr), + new DemoCallBack(currentTimeMs, messageKey, messageStr)); + } + + private Future<RecordMetadata> send(final int messageKey, final String messageStr) { + return producer.send(new ProducerRecord<>(topic, + messageKey, + messageStr)); } }