This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new e2f9f08  MINOR: Improve EOS example exception handling (#8052)
e2f9f08 is described below

commit e2f9f08d1c735e89e7de46e5af0ebfbe771de473
Author: Boyang Chen <boy...@confluent.io>
AuthorDate: Thu Feb 20 09:59:09 2020 -0800

    MINOR: Improve EOS example exception handling (#8052)
    
    The current EOS example mixes fatal and non-fatal error handling. This 
patch fixes this problem and simplifies the example.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 examples/README                                    |  12 +-
 .../src/main/java/kafka/examples/Consumer.java     |   3 +
 .../examples/ExactlyOnceMessageProcessor.java      | 121 ++++++++-------------
 .../kafka/examples/KafkaConsumerProducerDemo.java  |   3 +-
 .../java/kafka/examples/KafkaExactlyOnceDemo.java  |  32 +++---
 .../main/java/kafka/examples/KafkaProperties.java  |   5 -
 6 files changed, 75 insertions(+), 101 deletions(-)

diff --git a/examples/README b/examples/README
index 2efe71a..bff6cd3 100644
--- a/examples/README
+++ b/examples/README
@@ -6,10 +6,8 @@ To run the demo:
    2. For simple consumer demo, `run bin/java-simple-consumer-demo.sh`
    3. For unlimited sync-producer-consumer run, `run 
bin/java-producer-consumer-demo.sh sync`
    4. For unlimited async-producer-consumer run, `run 
bin/java-producer-consumer-demo.sh`
-   5. For standalone mode exactly once demo run, `run bin/exactly-once-demo.sh 
standaloneMode 6 3 50000`,
-      this means we are starting 3 EOS instances with 6 topic partitions and 
50000 pre-populated records
-   6. For group mode exactly once demo run, `run bin/exactly-once-demo.sh 
groupMode 6 3 50000`,
-      this means the same as the standalone demo, except consumers are using 
subscription mode.
-   7. Some notes for exactly once demo:
-      7.1. The Kafka server has to be on broker version 2.5 or higher to be 
able to run group mode.
-      7.2. You could also use Intellij to run the example directly by 
configuring parameters as "Program arguments"
+   5. For exactly once demo run, `run bin/exactly-once-demo.sh 6 3 50000`,
+      this means we are starting 3 EOS instances with 6 topic partitions and 
50000 pre-populated records.
+   6. Some notes for exactly once demo:
+      6.1. The Kafka server has to be on broker version 2.5 or higher.
+      6.2. You could also use Intellij to run the example directly by 
configuring parameters as "Program arguments"
diff --git a/examples/src/main/java/kafka/examples/Consumer.java 
b/examples/src/main/java/kafka/examples/Consumer.java
index 19cb67c..d748832 100644
--- a/examples/src/main/java/kafka/examples/Consumer.java
+++ b/examples/src/main/java/kafka/examples/Consumer.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 
@@ -37,6 +38,7 @@ public class Consumer extends ShutdownableThread {
 
     public Consumer(final String topic,
                     final String groupId,
+                    final Optional<String> instanceId,
                     final boolean readCommitted,
                     final int numMessageToConsume,
                     final CountDownLatch latch) {
@@ -45,6 +47,7 @@ public class Consumer extends ShutdownableThread {
         Properties props = new Properties();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        instanceId.ifPresent(id -> 
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
diff --git 
a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java 
b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
index 482e442..8f31b19 100644
--- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
+++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
@@ -16,7 +16,6 @@
  */
 package kafka.examples;
 
-import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -34,8 +33,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -47,42 +46,32 @@ public class ExactlyOnceMessageProcessor extends Thread {
 
     private static final boolean READ_COMMITTED = true;
 
-    private final String mode;
     private final String inputTopic;
     private final String outputTopic;
-    private final String consumerGroupId;
-    private final int numPartitions;
-    private final int numInstances;
-    private final int instanceIdx;
     private final String transactionalId;
+    private final String groupInstanceId;
 
     private final KafkaProducer<Integer, String> producer;
     private final KafkaConsumer<Integer, String> consumer;
 
     private final CountDownLatch latch;
 
-    public ExactlyOnceMessageProcessor(final String mode,
-                                       final String inputTopic,
+    public ExactlyOnceMessageProcessor(final String inputTopic,
                                        final String outputTopic,
-                                       final int numPartitions,
-                                       final int numInstances,
                                        final int instanceIdx,
                                        final CountDownLatch latch) {
-        this.mode = mode;
         this.inputTopic = inputTopic;
         this.outputTopic = outputTopic;
-        this.consumerGroupId = "Eos-consumer";
-        this.numPartitions = numPartitions;
-        this.numInstances = numInstances;
-        this.instanceIdx = instanceIdx;
         this.transactionalId = "Processor-" + instanceIdx;
-        // If we are using the group mode, it is recommended to have a 
relatively short txn timeout
-        // in order to clear pending offsets faster.
-        final int transactionTimeoutMs = this.mode.equals("groupMode") ? 10000 
: -1;
+        // It is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster.
+        final int transactionTimeoutMs = 10000;
         // A unique transactional.id must be provided in order to properly use 
EOS.
         producer = new Producer(outputTopic, true, transactionalId, true, -1, 
transactionTimeoutMs, null).get();
         // Consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data.
-        consumer = new Consumer(inputTopic, consumerGroupId, READ_COMMITTED, 
-1, null).get();
+        // Consumer could optionally configure groupInstanceId to avoid 
unnecessary rebalances.
+        this.groupInstanceId = "Txn-consumer-" + instanceIdx;
+        consumer = new Consumer(inputTopic, "Eos-consumer",
+            Optional.of(groupInstanceId), READ_COMMITTED, -1, null).get();
         this.latch = latch;
     }
 
@@ -93,49 +82,24 @@ public class ExactlyOnceMessageProcessor extends Thread {
 
         final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
 
-        // Under group mode, topic based subscription is sufficient as EOS 
apps are safe to cooperate transactionally after 2.5.
-        // Under standalone mode, user needs to manually assign the topic 
partitions and make sure the assignment is unique
-        // across the consumer group instances.
-        if (this.mode.equals("groupMode")) {
-            consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
-                @Override
-                public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
-                    printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
-                }
-
-                @Override
-                public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
-                    printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
-                    messageRemaining.set(messagesRemaining(consumer));
-                }
-            });
-        } else {
-            // Do a range assignment of topic partitions.
-            List<TopicPartition> topicPartitions = new ArrayList<>();
-            int rangeSize = numPartitions / numInstances;
-            int startPartition = rangeSize * instanceIdx;
-            int endPartition = Math.min(numPartitions - 1, startPartition + 
rangeSize - 1);
-            for (int partition = startPartition; partition <= endPartition; 
partition++) {
-                topicPartitions.add(new TopicPartition(inputTopic, partition));
+        consumer.subscribe(Collections.singleton(inputTopic), new 
ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                printWithTxnId("Revoked partition assignment to kick-off 
rebalancing: " + partitions);
             }
 
-            consumer.assign(topicPartitions);
-            printWithTxnId("Manually assign partitions: " + topicPartitions);
-        }
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                printWithTxnId("Received partition assignment after 
rebalancing: " + partitions);
+                messageRemaining.set(messagesRemaining(consumer));
+            }
+        });
 
         int messageProcessed = 0;
-        boolean abortPreviousTransaction = false;
         while (messageRemaining.get() > 0) {
-            ConsumerRecords<Integer, String> records = 
consumer.poll(Duration.ofMillis(200));
-            if (records.count() > 0) {
-                try {
-                    // Abort previous transaction if instructed.
-                    if (abortPreviousTransaction) {
-                        producer.abortTransaction();
-                        // The consumer fetch position also needs to be reset.
-                        resetToLastCommittedPositions(consumer);
-                        abortPreviousTransaction = false;
-                    }
+            try {
+                ConsumerRecords<Integer, String> records = 
consumer.poll(Duration.ofMillis(200));
+                if (records.count() > 0) {
                     // Begin a new transaction session.
                     producer.beginTransaction();
                     for (ConsumerRecord<Integer, String> record : records) {
@@ -143,28 +107,31 @@ public class ExactlyOnceMessageProcessor extends Thread {
                         ProducerRecord<Integer, String> customizedRecord = 
transform(record);
                         producer.send(customizedRecord);
                     }
-                    Map<TopicPartition, OffsetAndMetadata> positions = new 
HashMap<>();
-                    for (TopicPartition topicPartition : 
consumer.assignment()) {
-                        positions.put(topicPartition, new 
OffsetAndMetadata(consumer.position(topicPartition), null));
-                    }
+
+                    Map<TopicPartition, OffsetAndMetadata> offsets = 
consumerOffsets();
+
                     // Checkpoint the progress by sending offsets to group 
coordinator broker.
-                    // Under group mode, we must apply consumer group metadata 
for proper fencing.
-                    if (this.mode.equals("groupMode")) {
-                        producer.sendOffsetsToTransaction(positions, 
consumer.groupMetadata());
-                    } else {
-                        producer.sendOffsetsToTransaction(positions, 
consumerGroupId);
-                    }
+                    // Note that this API is only available for broker >= 2.5.
+                    producer.sendOffsetsToTransaction(offsets, 
consumer.groupMetadata());
 
                     // Finish the transaction. All sent records should be 
visible for consumption now.
                     producer.commitTransaction();
                     messageProcessed += records.count();
-                } catch (CommitFailedException e) {
-                    // In case of a retriable exception, suggest aborting the 
ongoing transaction for correctness.
-                    abortPreviousTransaction = true;
-                } catch (ProducerFencedException | FencedInstanceIdException 
e) {
-                    throw new KafkaException("Encountered fatal error during 
processing: " + e.getMessage());
                 }
+            } catch (ProducerFencedException e) {
+                throw new KafkaException(String.format("The transactional.id 
%s has been claimed by another process", transactionalId));
+            } catch (FencedInstanceIdException e) {
+                throw new KafkaException(String.format("The group.instance.id 
%s has been claimed by another process", groupInstanceId));
+            } catch (KafkaException e) {
+                // If we have not been fenced, try to abort the transaction 
and continue. This will raise immediately
+                // if the producer has hit a fatal error.
+                producer.abortTransaction();
+
+                // The consumer fetch position needs to be restored to the 
committed offset
+                // before the transaction started.
+                resetToLastCommittedPositions(consumer);
             }
+
             messageRemaining.set(messagesRemaining(consumer));
             printWithTxnId("Message remaining: " + messageRemaining);
         }
@@ -173,6 +140,14 @@ public class ExactlyOnceMessageProcessor extends Thread {
         latch.countDown();
     }
 
+    private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        for (TopicPartition topicPartition : consumer.assignment()) {
+            offsets.put(topicPartition, new 
OffsetAndMetadata(consumer.position(topicPartition), null));
+        }
+        return offsets;
+    }
+
     private void printWithTxnId(final String message) {
         System.out.println(transactionalId + ": " + message);
     }
diff --git 
a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java 
b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
index 8a29402..9fc911a 100644
--- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
@@ -18,6 +18,7 @@ package kafka.examples;
 
 import org.apache.kafka.common.errors.TimeoutException;
 
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -28,7 +29,7 @@ public class KafkaConsumerProducerDemo {
         Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync, 
null, false, 10000, -1, latch);
         producerThread.start();
 
-        Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, 
"DemoConsumer", false, 10000, latch);
+        Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, 
"DemoConsumer", Optional.empty(), false, 10000, latch);
         consumerThread.start();
 
         if (!latch.await(5, TimeUnit.MINUTES)) {
diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java 
b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
index 6da159c..50a1ad1 100644
--- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
@@ -25,6 +25,7 @@ import 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -32,12 +33,15 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
- * This exactly once demo driver takes 4 arguments:
- *   - mode: whether to run as standalone app, or a group
+ * This exactly once demo driver takes 3 arguments:
  *   - partition: number of partitions for input/output topic
  *   - instances: number of instances
  *   - records: number of records
- * An example argument list would be `groupMode 6 3 50000`
+ * An example argument list would be `6 3 50000`.
+ *
+ * If you are using Intellij, the above arguments should be put in the 
configuration's `Program Arguments`.
+ * Also recommended to set an output log file by `Edit Configuration -> Logs 
-> Save console
+ * output to file` to record all the log output together.
  *
  * The driver could be decomposed as following stages:
  *
@@ -60,10 +64,10 @@ import java.util.concurrent.TimeUnit;
  *    The driver will block for the consumption of all committed records.
  *
  * From this demo, you could see that all the records from pre-population are 
processed exactly once,
- * in either standalone mode or group mode, with strong partition level 
ordering guarantee.
+ * with strong partition level ordering guarantee.
  *
  * Note: please start the kafka broker and zookeeper in local first. The 
broker version must be >= 2.5
- * in order to run group mode, otherwise the app could throw
+ * in order to run, otherwise the app could throw
  * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
  */
 public class KafkaExactlyOnceDemo {
@@ -72,15 +76,14 @@ public class KafkaExactlyOnceDemo {
     private static final String OUTPUT_TOPIC = "output-topic";
 
     public static void main(String[] args) throws InterruptedException, 
ExecutionException {
-        if (args.length != 4) {
-            throw new IllegalArgumentException("Should accept 4 parameters: 
[mode], " +
+        if (args.length != 3) {
+            throw new IllegalArgumentException("Should accept 3 parameters: " +
                 "[number of partitions], [number of instances], [number of 
records]");
         }
 
-        String mode = args[0];
-        int numPartitions = Integer.parseInt(args[1]);
-        int numInstances = Integer.parseInt(args[2]);
-        int numRecords = Integer.parseInt(args[3]);
+        int numPartitions = Integer.parseInt(args[0]);
+        int numInstances = Integer.parseInt(args[1]);
+        int numRecords = Integer.parseInt(args[2]);
 
         /* Stage 1: topic cleanup and recreation */
         recreateTopics(numPartitions);
@@ -99,9 +102,8 @@ public class KafkaExactlyOnceDemo {
 
         /* Stage 3: transactionally process all messages */
         for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) {
-            ExactlyOnceMessageProcessor messageProcessor = new 
ExactlyOnceMessageProcessor(mode,
-                INPUT_TOPIC, OUTPUT_TOPIC, numPartitions,
-                numInstances, instanceIdx, transactionalCopyLatch);
+            ExactlyOnceMessageProcessor messageProcessor = new 
ExactlyOnceMessageProcessor(
+                INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, 
transactionalCopyLatch);
             messageProcessor.start();
         }
 
@@ -112,7 +114,7 @@ public class KafkaExactlyOnceDemo {
         CountDownLatch consumeLatch = new CountDownLatch(1);
 
         /* Stage 4: consume all processed messages to verify exactly once */
-        Consumer consumerThread = new Consumer(OUTPUT_TOPIC, 
"Verify-consumer", true, numRecords, consumeLatch);
+        Consumer consumerThread = new Consumer(OUTPUT_TOPIC, 
"Verify-consumer", Optional.empty(), true, numRecords, consumeLatch);
         consumerThread.start();
 
         if (!consumeLatch.await(5, TimeUnit.MINUTES)) {
diff --git a/examples/src/main/java/kafka/examples/KafkaProperties.java 
b/examples/src/main/java/kafka/examples/KafkaProperties.java
index cd737cf..e73c8d7 100644
--- a/examples/src/main/java/kafka/examples/KafkaProperties.java
+++ b/examples/src/main/java/kafka/examples/KafkaProperties.java
@@ -20,11 +20,6 @@ public class KafkaProperties {
     public static final String TOPIC = "topic1";
     public static final String KAFKA_SERVER_URL = "localhost";
     public static final int KAFKA_SERVER_PORT = 9092;
-    public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024;
-    public static final int CONNECTION_TIMEOUT = 100000;
-    public static final String TOPIC2 = "topic2";
-    public static final String TOPIC3 = "topic3";
-    public static final String CLIENT_ID = "SimpleConsumerDemoClient";
 
     private KafkaProperties() {}
 }

Reply via email to