This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a4b2a086f8639dece77a95b47bbb79840a455150
Author: Boyang Chen <boy...@confluent.io>
AuthorDate: Wed Feb 5 16:51:08 2020 -0800

    KAFKA-9447: Add new customized EOS model example (#8031)
    
    With the improvement of 447, we are now offering developers a better 
experience on writing their customized EOS apps with group subscription, 
instead of manual assignments. With the demo, user should be able to get 
started more quickly on writing their own EOS app, and understand the 
processing logic much better.
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 .../kafka/clients/admin/DescribeTopicsResult.java  |  23 +--
 .../kafka/clients/admin/ListTopicsResult.java      |  14 +-
 examples/README                                    |   8 +-
 examples/bin/exactly-once-demo.sh                  |  23 +++
 .../src/main/java/kafka/examples/Consumer.java     |  32 +++-
 .../examples/ExactlyOnceMessageProcessor.java      | 209 +++++++++++++++++++++
 .../kafka/examples/KafkaConsumerProducerDemo.java  |  14 +-
 .../java/kafka/examples/KafkaExactlyOnceDemo.java  | 185 ++++++++++++++++++
 .../src/main/java/kafka/examples/Producer.java     |  42 ++++-
 9 files changed, 508 insertions(+), 42 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
index 34698b9..7753984 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
@@ -51,21 +51,18 @@ public class DescribeTopicsResult {
      */
     public KafkaFuture<Map<String, TopicDescription>> all() {
         return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
-            thenApply(new KafkaFuture.BaseFunction<Void, Map<String, 
TopicDescription>>() {
-                @Override
-                public Map<String, TopicDescription> apply(Void v) {
-                    Map<String, TopicDescription> descriptions = new 
HashMap<>(futures.size());
-                    for (Map.Entry<String, KafkaFuture<TopicDescription>> 
entry : futures.entrySet()) {
-                        try {
-                            descriptions.put(entry.getKey(), 
entry.getValue().get());
-                        } catch (InterruptedException | ExecutionException e) {
-                            // This should be unreachable, because allOf 
ensured that all the futures
-                            // completed successfully.
-                            throw new RuntimeException(e);
-                        }
+            thenApply(v -> {
+                Map<String, TopicDescription> descriptions = new 
HashMap<>(futures.size());
+                for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : 
futures.entrySet()) {
+                    try {
+                        descriptions.put(entry.getKey(), 
entry.getValue().get());
+                    } catch (InterruptedException | ExecutionException e) {
+                        // This should be unreachable, because allOf ensured 
that all the futures
+                        // completed successfully.
+                        throw new RuntimeException(e);
                     }
-                    return descriptions;
                 }
+                return descriptions;
             });
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
index 4e7e1a2..2154073 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java
@@ -48,23 +48,13 @@ public class ListTopicsResult {
      * Return a future which yields a collection of TopicListing objects.
      */
     public KafkaFuture<Collection<TopicListing>> listings() {
-        return future.thenApply(new KafkaFuture.BaseFunction<Map<String, 
TopicListing>, Collection<TopicListing>>() {
-            @Override
-            public Collection<TopicListing> apply(Map<String, TopicListing> 
namesToDescriptions) {
-                return namesToDescriptions.values();
-            }
-        });
+        return future.thenApply(namesToDescriptions -> 
namesToDescriptions.values());
     }
 
     /**
      * Return a future which yields a collection of topic names.
      */
     public KafkaFuture<Set<String>> names() {
-        return future.thenApply(new KafkaFuture.BaseFunction<Map<String, 
TopicListing>, Set<String>>() {
-            @Override
-            public Set<String> apply(Map<String, TopicListing> 
namesToListings) {
-                return namesToListings.keySet();
-            }
-        });
+        return future.thenApply(namesToListings -> namesToListings.keySet());
     }
 }
diff --git a/examples/README b/examples/README
index f6e3410..2efe71a 100644
--- a/examples/README
+++ b/examples/README
@@ -6,4 +6,10 @@ To run the demo:
    2. For simple consumer demo, `run bin/java-simple-consumer-demo.sh`
    3. For unlimited sync-producer-consumer run, `run 
bin/java-producer-consumer-demo.sh sync`
    4. For unlimited async-producer-consumer run, `run 
bin/java-producer-consumer-demo.sh`
-
+   5. For standalone mode exactly once demo run, `run bin/exactly-once-demo.sh 
standaloneMode 6 3 50000`,
+      this means we are starting 3 EOS instances with 6 topic partitions and 
50000 pre-populated records
+   6. For group mode exactly once demo run, `run bin/exactly-once-demo.sh 
groupMode 6 3 50000`,
+      this means the same as the standalone demo, except consumers are using 
subscription mode.
+   7. Some notes for exactly once demo:
+      7.1. The Kafka server has to be on broker version 2.5 or higher to be 
able to run group mode.
+      7.2. You could also use Intellij to run the example directly by 
configuring parameters as "Program arguments"
diff --git a/examples/bin/exactly-once-demo.sh 
b/examples/bin/exactly-once-demo.sh
new file mode 100755
index 0000000..e9faa42
--- /dev/null
+++ b/examples/bin/exactly-once-demo.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)/../..
+
+if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
+    export KAFKA_HEAP_OPTS="-Xmx512M"
+fi
+
+exec $base_dir/bin/kafka-run-class.sh kafka.examples.KafkaExactlyOnceDemo $@
diff --git a/examples/src/main/java/kafka/examples/Consumer.java 
b/examples/src/main/java/kafka/examples/Consumer.java
index 26d6e23..19cb67c 100644
--- a/examples/src/main/java/kafka/examples/Consumer.java
+++ b/examples/src/main/java/kafka/examples/Consumer.java
@@ -25,24 +25,45 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 
 public class Consumer extends ShutdownableThread {
     private final KafkaConsumer<Integer, String> consumer;
     private final String topic;
+    private final String groupId;
+    private final int numMessageToConsume;
+    private int messageRemaining;
+    private final CountDownLatch latch;
 
-    public Consumer(String topic) {
+    public Consumer(final String topic,
+                    final String groupId,
+                    final boolean readCommitted,
+                    final int numMessageToConsume,
+                    final CountDownLatch latch) {
         super("KafkaConsumerExample", false);
+        this.groupId = groupId;
         Properties props = new Properties();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.IntegerDeserializer");
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+        if (readCommitted) {
+            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+        }
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
         consumer = new KafkaConsumer<>(props);
         this.topic = topic;
+        this.numMessageToConsume = numMessageToConsume;
+        this.messageRemaining = numMessageToConsume;
+        this.latch = latch;
+    }
+
+    KafkaConsumer<Integer, String> get() {
+        return consumer;
     }
 
     @Override
@@ -50,7 +71,12 @@ public class Consumer extends ShutdownableThread {
         consumer.subscribe(Collections.singletonList(this.topic));
         ConsumerRecords<Integer, String> records = 
consumer.poll(Duration.ofSeconds(1));
         for (ConsumerRecord<Integer, String> record : records) {
-            System.out.println("Received message: (" + record.key() + ", " + 
record.value() + ") at offset " + record.offset());
+            System.out.println(groupId + " received message : from partition " 
+ record.partition() + ", (" + record.key() + ", " + record.value() + ") at 
offset " + record.offset());
+        }
+        messageRemaining -= records.count();
+        if (messageRemaining <= 0) {
+            System.out.println(groupId + " finished reading " + 
numMessageToConsume + " messages");
+            latch.countDown();
         }
     }
 
diff --git 
a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java 
b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
new file mode 100644
index 0000000..53685f3
--- /dev/null
+++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.examples;
+
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A demo class for how to write a customized EOS app. It takes a 
consume-process-produce loop.
+ * Important configurations and APIs are commented.
+ */
+public class ExactlyOnceMessageProcessor extends Thread {
+
+    private static final boolean READ_COMMITTED = true;
+
+    private final String mode;
+    private final String inputTopic;
+    private final String outputTopic;
+    private final String consumerGroupId;
+    private final int numPartitions;
+    private final int numInstances;
+    private final int instanceIdx;
+    private final String transactionalId;
+
+    private final KafkaProducer<Integer, String> producer;
+    private final KafkaConsumer<Integer, String> consumer;
+
+    private final CountDownLatch latch;
+
+    public ExactlyOnceMessageProcessor(final String mode,
+                                       final String inputTopic,
+                                       final String outputTopic,
+                                       final int numPartitions,
+                                       final int numInstances,
+                                       final int instanceIdx,
+                                       final CountDownLatch latch) {
+        this.mode = mode;
+        this.inputTopic = inputTopic;
+        this.outputTopic = outputTopic;
+        this.consumerGroupId = "Eos-consumer";
+        this.numPartitions = numPartitions;
+        this.numInstances = numInstances;
+        this.instanceIdx = instanceIdx;
+        this.transactionalId = "Processor-" + instanceIdx;
+        // A unique transactional.id must be provided in order to properly use 
EOS.
+        producer = new Producer(outputTopic, true, transactionalId, true, -1, 
null).get();
+        // Consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data.
+        consumer = new Consumer(inputTopic, consumerGroupId, READ_COMMITTED, 
-1, null).get();
+        this.latch = latch;
+    }
+
+    @Override
+    public void run() {
+        // Init transactions call should always happen first in order to clear 
zombie transactions from previous generation.
+        producer.initTransactions();
+
+        final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
+
+        // Under group mode, topic based subscription is sufficient as EOS 
apps are safe to cooperate transactionally after 2.5.
+        // Under standalone mode, user needs to manually assign the topic 
partitions and make sure the assignment is unique
+        // across the consumer group instances.
+        if (this.mode.equals("groupMode")) {
+            consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                    printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
+                }
+
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                    printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
+                    messageRemaining.set(messagesRemaining(consumer));
+                }
+            });
+        } else {
+            // Do a range assignment of topic partitions.
+            List<TopicPartition> topicPartitions = new ArrayList<>();
+            int rangeSize = numPartitions / numInstances;
+            int startPartition = rangeSize * instanceIdx;
+            int endPartition = Math.min(numPartitions - 1, startPartition + 
rangeSize - 1);
+            for (int partition = startPartition; partition <= endPartition; 
partition++) {
+                topicPartitions.add(new TopicPartition(inputTopic, partition));
+            }
+
+            consumer.assign(topicPartitions);
+            printWithTxnId("Manually assign partitions: " + topicPartitions);
+        }
+
+        int messageProcessed = 0;
+        boolean abortPreviousTransaction = false;
+        while (messageRemaining.get() > 0) {
+            ConsumerRecords<Integer, String> records = 
consumer.poll(Duration.ofMillis(200));
+            if (records.count() > 0) {
+                try {
+                    // Abort previous transaction if instructed.
+                    if (abortPreviousTransaction) {
+                        producer.abortTransaction();
+                        // The consumer fetch position also needs to be reset.
+                        resetToLastCommittedPositions(consumer);
+                        abortPreviousTransaction = false;
+                    }
+                    // Begin a new transaction session.
+                    producer.beginTransaction();
+                    for (ConsumerRecord<Integer, String> record : records) {
+                        // Process the record and send to downstream.
+                        ProducerRecord<Integer, String> customizedRecord = 
transform(record);
+                        producer.send(customizedRecord);
+                    }
+                    Map<TopicPartition, OffsetAndMetadata> positions = new 
HashMap<>();
+                    for (TopicPartition topicPartition : 
consumer.assignment()) {
+                        positions.put(topicPartition, new 
OffsetAndMetadata(consumer.position(topicPartition), null));
+                    }
+                    // Checkpoint the progress by sending offsets to group 
coordinator broker.
+                    // Under group mode, we must apply consumer group metadata 
for proper fencing.
+                    if (this.mode.equals("groupMode")) {
+                        producer.sendOffsetsToTransaction(positions, 
consumer.groupMetadata());
+                    } else {
+                        producer.sendOffsetsToTransaction(positions, 
consumerGroupId);
+                    }
+
+                    // Finish the transaction. All sent records should be 
visible for consumption now.
+                    producer.commitTransaction();
+                    messageProcessed += records.count();
+                } catch (CommitFailedException e) {
+                    // In case of a retriable exception, suggest aborting the 
ongoing transaction for correctness.
+                    abortPreviousTransaction = true;
+                } catch (ProducerFencedException | FencedInstanceIdException 
e) {
+                    throw new KafkaException("Encountered fatal error during 
processing: " + e.getMessage());
+                }
+            }
+            messageRemaining.set(messagesRemaining(consumer));
+            printWithTxnId("Message remaining: " + messageRemaining);
+        }
+
+        printWithTxnId("Finished processing " + messageProcessed + " records");
+        latch.countDown();
+    }
+
+    private void printWithTxnId(final String message) {
+        System.out.println(transactionalId + ": " + message);
+    }
+
+    private ProducerRecord<Integer, String> transform(final 
ConsumerRecord<Integer, String> record) {
+        printWithTxnId("Transformed record (" + record.key() + "," + 
record.value() + ")");
+        return new ProducerRecord<>(outputTopic, record.key() / 2, 
"Transformed_" + record.value());
+    }
+
+    private long messagesRemaining(final KafkaConsumer<Integer, String> 
consumer) {
+        final Map<TopicPartition, Long> fullEndOffsets = 
consumer.endOffsets(new ArrayList<>(consumer.assignment()));
+        // If we couldn't detect any end offset, that means we are still not 
able to fetch offsets.
+        if (fullEndOffsets.isEmpty()) {
+            return Long.MAX_VALUE;
+        }
+
+        return consumer.assignment().stream().mapToLong(partition -> {
+            long currentPosition = consumer.position(partition);
+            printWithTxnId("Processing partition " + partition + " with full 
offsets " + fullEndOffsets);
+            if (fullEndOffsets.containsKey(partition)) {
+                return fullEndOffsets.get(partition) - currentPosition;
+            }
+            return 0;
+        }).sum();
+    }
+
+    private static void resetToLastCommittedPositions(KafkaConsumer<Integer, 
String> consumer) {
+        final Map<TopicPartition, OffsetAndMetadata> committed = 
consumer.committed(consumer.assignment());
+        consumer.assignment().forEach(tp -> {
+            OffsetAndMetadata offsetAndMetadata = committed.get(tp);
+            if (offsetAndMetadata != null)
+                consumer.seek(tp, offsetAndMetadata.offset());
+            else
+                consumer.seekToBeginning(Collections.singleton(tp));
+        });
+    }
+}
diff --git 
a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java 
b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
index f42ed6f..561732b 100644
--- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
@@ -16,14 +16,20 @@
  */
 package kafka.examples;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 public class KafkaConsumerProducerDemo {
-    public static void main(String[] args) {
+    public static void main(String[] args) throws InterruptedException {
         boolean isAsync = args.length == 0 || 
!args[0].trim().equalsIgnoreCase("sync");
-        Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync);
+        CountDownLatch latch = new CountDownLatch(2);
+        Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync, 
null, false, 10000, latch);
         producerThread.start();
 
-        Consumer consumerThread = new Consumer(KafkaProperties.TOPIC);
+        Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, 
"DemoConsumer", false, 10000, latch);
         consumerThread.start();
-
+        latch.await(5, TimeUnit.MINUTES);
+        consumerThread.shutdown();
+        System.out.println("All finished!");
     }
 }
diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java 
b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
new file mode 100644
index 0000000..d418eba
--- /dev/null
+++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.examples;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This exactly once demo driver takes 4 arguments:
+ *   - mode: whether to run as standalone app, or a group
+ *   - partition: number of partitions for input/output topic
+ *   - instances: number of instances
+ *   - records: number of records
+ * An example argument list would be `groupMode 6 3 50000`
+ *
+ * The driver could be decomposed as following stages:
+ *
+ * 1. Cleanup any topic whose name conflicts with input and output topic, so 
that we have a clean-start.
+ *
+ * 2. Set up a producer in a separate thread to pre-populate a set of records 
with even number keys into
+ *    the input topic. The driver will block for the record generation to 
finish, so the producer
+ *    must be in synchronous sending mode.
+ *
+ * 3. Set up transactional instances in separate threads which does a 
consume-process-produce loop,
+ *    tailing data from input topic (See {@link ExactlyOnceMessageProcessor}). 
Each EOS instance will
+ *    drain all the records from either given partitions or auto assigned 
partitions by actively
+ *    comparing log end offset with committed offset. Each record will be 
processed exactly once
+ *    as dividing the key by 2, and extend the value message. The driver will 
block for all the record
+ *    processing to finish. The transformed record shall be written to the 
output topic, with
+ *    transactional guarantee.
+ *
+ * 4. Set up a read committed consumer in a separate thread to verify we have 
all records within
+ *    the output topic, while the message ordering on partition level is 
maintained.
+ *    The driver will block for the consumption of all committed records.
+ *
+ * From this demo, you could see that all the records from pre-population are 
processed exactly once,
+ * in either standalone mode or group mode, with strong partition level 
ordering guarantee.
+ *
+ * Note: please start the kafka broker and zookeeper in local first. The 
broker version must be >= 2.5
+ * in order to run group mode, otherwise the app could throw
+ * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
+ */
+public class KafkaExactlyOnceDemo {
+
+    private static final String INPUT_TOPIC = "input-topic";
+    private static final String OUTPUT_TOPIC = "output-topic";
+
+    public static void main(String[] args) throws InterruptedException, 
ExecutionException {
+        if (args.length != 4) {
+            throw new IllegalArgumentException("Should accept 4 parameters: 
[mode], " +
+                "[number of partitions], [number of instances], [number of 
records]");
+        }
+
+        String mode = args[0];
+        int numPartitions = Integer.valueOf(args[1]);
+        int numInstances = Integer.valueOf(args[2]);
+        int numRecords = Integer.valueOf(args[3]);
+
+        /* Stage 1: topic cleanup and recreation */
+        recreateTopics(numPartitions);
+
+        CountDownLatch prePopulateLatch = new CountDownLatch(1);
+
+        /* Stage 2: pre-populate records */
+        Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, 
numRecords, prePopulateLatch);
+        producerThread.start();
+
+        prePopulateLatch.await(5, TimeUnit.MINUTES);
+
+        CountDownLatch transactionalCopyLatch = new 
CountDownLatch(numInstances);
+
+        /* Stage 3: transactionally process all messages */
+        for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) {
+            ExactlyOnceMessageProcessor messageProcessor = new 
ExactlyOnceMessageProcessor(mode,
+                INPUT_TOPIC, OUTPUT_TOPIC, numPartitions,
+                numInstances, instanceIdx, transactionalCopyLatch);
+            messageProcessor.start();
+        }
+
+        transactionalCopyLatch.await(5, TimeUnit.MINUTES);
+
+        CountDownLatch consumeLatch = new CountDownLatch(1);
+
+        /* Stage 4: consume all processed messages to verify exactly once */
+        Consumer consumerThread = new Consumer(OUTPUT_TOPIC, 
"Verify-consumer", true, numRecords, consumeLatch);
+        consumerThread.start();
+
+        consumeLatch.await(5, TimeUnit.MINUTES);
+        consumerThread.shutdown();
+        System.out.println("All finished!");
+    }
+
+    private static void recreateTopics(final int numPartitions)
+        throws ExecutionException, InterruptedException {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+            KafkaProperties.KAFKA_SERVER_URL + ":" + 
KafkaProperties.KAFKA_SERVER_PORT);
+
+        Admin adminClient = Admin.create(props);
+
+        List<String> topicsToDelete = Arrays.asList(INPUT_TOPIC, OUTPUT_TOPIC);
+
+        deleteTopic(adminClient, topicsToDelete);
+
+        // Check topic existence in a retry loop
+        while (true) {
+            System.out.println("Making sure the topics are deleted 
successfully: " + topicsToDelete);
+
+            Set<String> listedTopics = adminClient.listTopics().names().get();
+            System.out.println("Current list of topics: " + listedTopics);
+
+            boolean hasTopicInfo = false;
+            for (String listedTopic : listedTopics) {
+                if (topicsToDelete.contains(listedTopic)) {
+                    hasTopicInfo = true;
+                    break;
+                }
+            }
+            if (!hasTopicInfo) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        // Create topics in a retry loop
+        while (true) {
+            final short replicationFactor = 1;
+            final List<NewTopic> newTopics = Arrays.asList(
+                new NewTopic(INPUT_TOPIC, numPartitions, replicationFactor),
+                new NewTopic(OUTPUT_TOPIC, numPartitions, replicationFactor));
+            try {
+                adminClient.createTopics(newTopics).all().get();
+                System.out.println("Created new topics: " + newTopics);
+                break;
+            } catch (ExecutionException e) {
+                if (!(e.getCause() instanceof TopicExistsException)) {
+                    throw e;
+                }
+                System.out.println("Metadata of the old topics are not cleared 
yet...");
+
+                deleteTopic(adminClient, topicsToDelete);
+
+                Thread.sleep(1000);
+            }
+        }
+    }
+
+    private static void deleteTopic(final Admin adminClient, final 
List<String> topicsToDelete)
+        throws InterruptedException, ExecutionException {
+        try {
+            adminClient.deleteTopics(topicsToDelete).all().get();
+        } catch (ExecutionException e) {
+            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
+                throw e;
+            }
+            System.out.println("Encountered exception during topic deletion: " 
+ e.getCause());
+        }
+        System.out.println("Deleted old topics: " + topicsToDelete);
+    }
+}
diff --git a/examples/src/main/java/kafka/examples/Producer.java 
b/examples/src/main/java/kafka/examples/Producer.java
index b6998c5..3805dd3 100644
--- a/examples/src/main/java/kafka/examples/Producer.java
+++ b/examples/src/main/java/kafka/examples/Producer.java
@@ -25,45 +25,69 @@ import 
org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 
 public class Producer extends Thread {
     private final KafkaProducer<Integer, String> producer;
     private final String topic;
     private final Boolean isAsync;
+    private int numRecords;
+    private final CountDownLatch latch;
 
-    public Producer(String topic, Boolean isAsync) {
+    public Producer(final String topic,
+                    final Boolean isAsync,
+                    final String transactionalId,
+                    final boolean enableIdempotency,
+                    final int numRecords,
+                    final CountDownLatch latch) {
         Properties props = new Properties();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
         props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        if (transactionalId != null) {
+            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+        }
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
+
         producer = new KafkaProducer<>(props);
         this.topic = topic;
         this.isAsync = isAsync;
+        this.numRecords = numRecords;
+        this.latch = latch;
+    }
+
+    KafkaProducer<Integer, String> get() {
+        return producer;
     }
 
+    @Override
     public void run() {
-        int messageNo = 1;
-        while (true) {
-            String messageStr = "Message_" + messageNo;
+        int messageKey = 0;
+        int recordsSent = 0;
+        while (recordsSent < numRecords) {
+            String messageStr = "Message_" + messageKey;
             long startTime = System.currentTimeMillis();
             if (isAsync) { // Send asynchronously
                 producer.send(new ProducerRecord<>(topic,
-                    messageNo,
-                    messageStr), new DemoCallBack(startTime, messageNo, 
messageStr));
+                    messageKey,
+                    messageStr), new DemoCallBack(startTime, messageKey, 
messageStr));
             } else { // Send synchronously
                 try {
                     producer.send(new ProducerRecord<>(topic,
-                        messageNo,
+                        messageKey,
                         messageStr)).get();
-                    System.out.println("Sent message: (" + messageNo + ", " + 
messageStr + ")");
+                    System.out.println("Sent message: (" + messageKey + ", " + 
messageStr + ")");
                 } catch (InterruptedException | ExecutionException e) {
                     e.printStackTrace();
                 }
             }
-            ++messageNo;
+            messageKey += 2;
+            recordsSent += 1;
         }
+        System.out.println("Producer sent " + numRecords + " records 
successfully");
+        latch.countDown();
     }
 }
 

Reply via email to