gwenshap commented on a change in pull request #9815:
URL: https://github.com/apache/kafka/pull/9815#discussion_r553705779



##########
File path: examples/src/main/java/kafka/examples/Consumer.java
##########
@@ -26,33 +25,34 @@
 import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
 
-public class Consumer extends ShutdownableThread {
+public class Consumer implements Runnable {
     private final KafkaConsumer<Integer, String> consumer;
     private final String topic;
     private final String groupId;
     private final int numMessageToConsume;
     private int messageRemaining;
-    private final CountDownLatch latch;
 
     public Consumer(final String topic,
                     final String groupId,
                     final Optional<String> instanceId,
                     final boolean readCommitted,
                     final int numMessageToConsume,
-                    final CountDownLatch latch) {
-        super("KafkaConsumerExample", false);
+                    final boolean transactional) {
         this.groupId = groupId;
         Properties props = new Properties();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         instanceId.ifPresent(id -> 
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.IntegerDeserializer");
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer");
+        // if consuming as part of exactly-once processor, committing will be 
done by the producer

Review comment:
       I am having trouble finding the code that prevents EOS consumer from 
auto-committing. Pointers will help :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to