This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push: new e2f9f08 MINOR: Improve EOS example exception handling (#8052) e2f9f08 is described below commit e2f9f08d1c735e89e7de46e5af0ebfbe771de473 Author: Boyang Chen <boy...@confluent.io> AuthorDate: Thu Feb 20 09:59:09 2020 -0800 MINOR: Improve EOS example exception handling (#8052) The current EOS example mixes fatal and non-fatal error handling. This patch fixes this problem and simplifies the example. Reviewers: Jason Gustafson <ja...@confluent.io> --- examples/README | 12 +- .../src/main/java/kafka/examples/Consumer.java | 3 + .../examples/ExactlyOnceMessageProcessor.java | 121 ++++++++------------- .../kafka/examples/KafkaConsumerProducerDemo.java | 3 +- .../java/kafka/examples/KafkaExactlyOnceDemo.java | 32 +++--- .../main/java/kafka/examples/KafkaProperties.java | 5 - 6 files changed, 75 insertions(+), 101 deletions(-) diff --git a/examples/README b/examples/README index 2efe71a..bff6cd3 100644 --- a/examples/README +++ b/examples/README @@ -6,10 +6,8 @@ To run the demo: 2. For simple consumer demo, `run bin/java-simple-consumer-demo.sh` 3. For unlimited sync-producer-consumer run, `run bin/java-producer-consumer-demo.sh sync` 4. For unlimited async-producer-consumer run, `run bin/java-producer-consumer-demo.sh` - 5. For standalone mode exactly once demo run, `run bin/exactly-once-demo.sh standaloneMode 6 3 50000`, - this means we are starting 3 EOS instances with 6 topic partitions and 50000 pre-populated records - 6. For group mode exactly once demo run, `run bin/exactly-once-demo.sh groupMode 6 3 50000`, - this means the same as the standalone demo, except consumers are using subscription mode. - 7. Some notes for exactly once demo: - 7.1. The Kafka server has to be on broker version 2.5 or higher to be able to run group mode. - 7.2. You could also use Intellij to run the example directly by configuring parameters as "Program arguments" + 5. For exactly once demo run, `run bin/exactly-once-demo.sh 6 3 50000`, + this means we are starting 3 EOS instances with 6 topic partitions and 50000 pre-populated records. + 6. Some notes for exactly once demo: + 6.1. The Kafka server has to be on broker version 2.5 or higher. + 6.2. You could also use Intellij to run the example directly by configuring parameters as "Program arguments" diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 19cb67c..d748832 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -37,6 +38,7 @@ public class Consumer extends ShutdownableThread { public Consumer(final String topic, final String groupId, + final Optional<String> instanceId, final boolean readCommitted, final int numMessageToConsume, final CountDownLatch latch) { @@ -45,6 +47,7 @@ public class Consumer extends ShutdownableThread { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id)); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java index 482e442..8f31b19 100644 --- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java +++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java @@ -16,7 +16,6 @@ */ package kafka.examples; -import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -34,8 +33,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; @@ -47,42 +46,32 @@ public class ExactlyOnceMessageProcessor extends Thread { private static final boolean READ_COMMITTED = true; - private final String mode; private final String inputTopic; private final String outputTopic; - private final String consumerGroupId; - private final int numPartitions; - private final int numInstances; - private final int instanceIdx; private final String transactionalId; + private final String groupInstanceId; private final KafkaProducer<Integer, String> producer; private final KafkaConsumer<Integer, String> consumer; private final CountDownLatch latch; - public ExactlyOnceMessageProcessor(final String mode, - final String inputTopic, + public ExactlyOnceMessageProcessor(final String inputTopic, final String outputTopic, - final int numPartitions, - final int numInstances, final int instanceIdx, final CountDownLatch latch) { - this.mode = mode; this.inputTopic = inputTopic; this.outputTopic = outputTopic; - this.consumerGroupId = "Eos-consumer"; - this.numPartitions = numPartitions; - this.numInstances = numInstances; - this.instanceIdx = instanceIdx; this.transactionalId = "Processor-" + instanceIdx; - // If we are using the group mode, it is recommended to have a relatively short txn timeout - // in order to clear pending offsets faster. - final int transactionTimeoutMs = this.mode.equals("groupMode") ? 10000 : -1; + // It is recommended to have a relatively short txn timeout in order to clear pending offsets faster. + final int transactionTimeoutMs = 10000; // A unique transactional.id must be provided in order to properly use EOS. producer = new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get(); // Consumer must be in read_committed mode, which means it won't be able to read uncommitted data. - consumer = new Consumer(inputTopic, consumerGroupId, READ_COMMITTED, -1, null).get(); + // Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances. + this.groupInstanceId = "Txn-consumer-" + instanceIdx; + consumer = new Consumer(inputTopic, "Eos-consumer", + Optional.of(groupInstanceId), READ_COMMITTED, -1, null).get(); this.latch = latch; } @@ -93,49 +82,24 @@ public class ExactlyOnceMessageProcessor extends Thread { final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); - // Under group mode, topic based subscription is sufficient as EOS apps are safe to cooperate transactionally after 2.5. - // Under standalone mode, user needs to manually assign the topic partitions and make sure the assignment is unique - // across the consumer group instances. - if (this.mode.equals("groupMode")) { - consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { - @Override - public void onPartitionsRevoked(Collection<TopicPartition> partitions) { - printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); - } - - @Override - public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - printWithTxnId("Received partition assignment after rebalancing: " + partitions); - messageRemaining.set(messagesRemaining(consumer)); - } - }); - } else { - // Do a range assignment of topic partitions. - List<TopicPartition> topicPartitions = new ArrayList<>(); - int rangeSize = numPartitions / numInstances; - int startPartition = rangeSize * instanceIdx; - int endPartition = Math.min(numPartitions - 1, startPartition + rangeSize - 1); - for (int partition = startPartition; partition <= endPartition; partition++) { - topicPartitions.add(new TopicPartition(inputTopic, partition)); + consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); } - consumer.assign(topicPartitions); - printWithTxnId("Manually assign partitions: " + topicPartitions); - } + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + printWithTxnId("Received partition assignment after rebalancing: " + partitions); + messageRemaining.set(messagesRemaining(consumer)); + } + }); int messageProcessed = 0; - boolean abortPreviousTransaction = false; while (messageRemaining.get() > 0) { - ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200)); - if (records.count() > 0) { - try { - // Abort previous transaction if instructed. - if (abortPreviousTransaction) { - producer.abortTransaction(); - // The consumer fetch position also needs to be reset. - resetToLastCommittedPositions(consumer); - abortPreviousTransaction = false; - } + try { + ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200)); + if (records.count() > 0) { // Begin a new transaction session. producer.beginTransaction(); for (ConsumerRecord<Integer, String> record : records) { @@ -143,28 +107,31 @@ public class ExactlyOnceMessageProcessor extends Thread { ProducerRecord<Integer, String> customizedRecord = transform(record); producer.send(customizedRecord); } - Map<TopicPartition, OffsetAndMetadata> positions = new HashMap<>(); - for (TopicPartition topicPartition : consumer.assignment()) { - positions.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null)); - } + + Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets(); + // Checkpoint the progress by sending offsets to group coordinator broker. - // Under group mode, we must apply consumer group metadata for proper fencing. - if (this.mode.equals("groupMode")) { - producer.sendOffsetsToTransaction(positions, consumer.groupMetadata()); - } else { - producer.sendOffsetsToTransaction(positions, consumerGroupId); - } + // Note that this API is only available for broker >= 2.5. + producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata()); // Finish the transaction. All sent records should be visible for consumption now. producer.commitTransaction(); messageProcessed += records.count(); - } catch (CommitFailedException e) { - // In case of a retriable exception, suggest aborting the ongoing transaction for correctness. - abortPreviousTransaction = true; - } catch (ProducerFencedException | FencedInstanceIdException e) { - throw new KafkaException("Encountered fatal error during processing: " + e.getMessage()); } + } catch (ProducerFencedException e) { + throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId)); + } catch (FencedInstanceIdException e) { + throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId)); + } catch (KafkaException e) { + // If we have not been fenced, try to abort the transaction and continue. This will raise immediately + // if the producer has hit a fatal error. + producer.abortTransaction(); + + // The consumer fetch position needs to be restored to the committed offset + // before the transaction started. + resetToLastCommittedPositions(consumer); } + messageRemaining.set(messagesRemaining(consumer)); printWithTxnId("Message remaining: " + messageRemaining); } @@ -173,6 +140,14 @@ public class ExactlyOnceMessageProcessor extends Thread { latch.countDown(); } + private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() { + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + for (TopicPartition topicPartition : consumer.assignment()) { + offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null)); + } + return offsets; + } + private void printWithTxnId(final String message) { System.out.println(transactionalId + ": " + message); } diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java index 8a29402..9fc911a 100644 --- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java @@ -18,6 +18,7 @@ package kafka.examples; import org.apache.kafka.common.errors.TimeoutException; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -28,7 +29,7 @@ public class KafkaConsumerProducerDemo { Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync, null, false, 10000, -1, latch); producerThread.start(); - Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, "DemoConsumer", false, 10000, latch); + Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, "DemoConsumer", Optional.empty(), false, 10000, latch); consumerThread.start(); if (!latch.await(5, TimeUnit.MINUTES)) { diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java index 6da159c..50a1ad1 100644 --- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -32,12 +33,15 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** - * This exactly once demo driver takes 4 arguments: - * - mode: whether to run as standalone app, or a group + * This exactly once demo driver takes 3 arguments: * - partition: number of partitions for input/output topic * - instances: number of instances * - records: number of records - * An example argument list would be `groupMode 6 3 50000` + * An example argument list would be `6 3 50000`. + * + * If you are using Intellij, the above arguments should be put in the configuration's `Program Arguments`. + * Also recommended to set an output log file by `Edit Configuration -> Logs -> Save console + * output to file` to record all the log output together. * * The driver could be decomposed as following stages: * @@ -60,10 +64,10 @@ import java.util.concurrent.TimeUnit; * The driver will block for the consumption of all committed records. * * From this demo, you could see that all the records from pre-population are processed exactly once, - * in either standalone mode or group mode, with strong partition level ordering guarantee. + * with strong partition level ordering guarantee. * * Note: please start the kafka broker and zookeeper in local first. The broker version must be >= 2.5 - * in order to run group mode, otherwise the app could throw + * in order to run, otherwise the app could throw * {@link org.apache.kafka.common.errors.UnsupportedVersionException}. */ public class KafkaExactlyOnceDemo { @@ -72,15 +76,14 @@ public class KafkaExactlyOnceDemo { private static final String OUTPUT_TOPIC = "output-topic"; public static void main(String[] args) throws InterruptedException, ExecutionException { - if (args.length != 4) { - throw new IllegalArgumentException("Should accept 4 parameters: [mode], " + + if (args.length != 3) { + throw new IllegalArgumentException("Should accept 3 parameters: " + "[number of partitions], [number of instances], [number of records]"); } - String mode = args[0]; - int numPartitions = Integer.parseInt(args[1]); - int numInstances = Integer.parseInt(args[2]); - int numRecords = Integer.parseInt(args[3]); + int numPartitions = Integer.parseInt(args[0]); + int numInstances = Integer.parseInt(args[1]); + int numRecords = Integer.parseInt(args[2]); /* Stage 1: topic cleanup and recreation */ recreateTopics(numPartitions); @@ -99,9 +102,8 @@ public class KafkaExactlyOnceDemo { /* Stage 3: transactionally process all messages */ for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) { - ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor(mode, - INPUT_TOPIC, OUTPUT_TOPIC, numPartitions, - numInstances, instanceIdx, transactionalCopyLatch); + ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor( + INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, transactionalCopyLatch); messageProcessor.start(); } @@ -112,7 +114,7 @@ public class KafkaExactlyOnceDemo { CountDownLatch consumeLatch = new CountDownLatch(1); /* Stage 4: consume all processed messages to verify exactly once */ - Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", true, numRecords, consumeLatch); + Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", Optional.empty(), true, numRecords, consumeLatch); consumerThread.start(); if (!consumeLatch.await(5, TimeUnit.MINUTES)) { diff --git a/examples/src/main/java/kafka/examples/KafkaProperties.java b/examples/src/main/java/kafka/examples/KafkaProperties.java index cd737cf..e73c8d7 100644 --- a/examples/src/main/java/kafka/examples/KafkaProperties.java +++ b/examples/src/main/java/kafka/examples/KafkaProperties.java @@ -20,11 +20,6 @@ public class KafkaProperties { public static final String TOPIC = "topic1"; public static final String KAFKA_SERVER_URL = "localhost"; public static final int KAFKA_SERVER_PORT = 9092; - public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024; - public static final int CONNECTION_TIMEOUT = 100000; - public static final String TOPIC2 = "topic2"; - public static final String TOPIC3 = "topic3"; - public static final String CLIENT_ID = "SimpleConsumerDemoClient"; private KafkaProperties() {} }