[ https://issues.apache.org/jira/browse/KAFKA-10576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuriy Badalyantc updated KAFKA-10576: ------------------------------------- Description: It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a different semantic. {code:java} public class TestKafka { public static void main(String[]args) { String id = "dev_test"; Map<String, Object> settings = new HashMap<>(); settings.put("bootstrap.servers", "localhost:9094"); settings.put("key.deserializer", StringDeserializer.class); settings.put("value.deserializer", StringDeserializer.class); settings.put("client.id", id); settings.put("group.id", id); String topic = "test"; Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1)); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) { consumer.commitSync(offsets); } } } {code} In the example above I created a consumer and use {{commitSync}} to commit offsets. This code works as expected — all offsets are committed to kafka. But in the case of {{commitAsync}} it will not work: {code:java} try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) { CompletableFuture<Boolean> result = new CompletableFuture<>(); consumer.commitAsync(offsets, new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { result.completeExceptionally(exception); } else { result.complete(true); } } }); result.get(15L, TimeUnit.SECONDS); } {code} The {{result}} future failed with a timeout. This behavior is pretty surprising. From naming and documentation, it looks like {{commitSync}} and {{commitAsync}} methods should behave identically. Of course, besides the blocking/non-blocking aspect. But in reality, there are some differences. I can assume that the {{commitAsync}} method somehow depends on the {{poll}} calls. But I didn't find any explicit information about it in {{KafkaConsumer}}'s javadoc or kafka documentation page. So, I believe that there are the next options: # It's a bug and not expected behavior. {{commitSync}} and {{commitAsync}} should have identical semantics. # It's expected, but not well-documented behavior. In that case, this behavior should be explicitly documented. was: It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a different semantic. {code:java} public class TestKafka { public static void main(String[]args) { String id = "dev_test"; Map<String, Object> settings = new HashMap<>(); settings.put("bootstrap.servers", "localhost:9094"); settings.put("key.deserializer", StringDeserializer.class); settings.put("value.deserializer", StringDeserializer.class); settings.put("client.id", id); settings.put("group.id", id); String topic = "test"; Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1)); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) { consumer.commitSync(offsets); } } } {code} In the example above I created a consumer and use {{commitSync}} to commit offsets. This code works as expected — all offsets are committed to kafka. But in the case of {{commitAsync}} it will not work: {code:java} try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) { CompletableFuture<Boolean> result = new CompletableFuture<>(); consumer.commitAsync(offsets, new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { result.completeExceptionally(exception); } else { result.complete(true); } } }); result.get(15L, TimeUnit.SECONDS); } {code} The {{result}} future failed with a timeout. This behavior is pretty surprising. From naming and documentation, it looks like {{commitSync}} and {{commitAsync}} methods should behave identically. Of course, besides the blocking/non-blocking aspect. But in reality, there are some differences. I can assume that the {{commitAsync}} method somehow depends on the {{poll}} calls. But I didn't find any explicit information about it in {{KafkaConsumer}}'s javadoc or kafka documentation page. So, I believe that there are the next options: # It's a but and not expected behavior. {{commitSync}} and {{commitAsync}} should have identical semantics. # It's expected, but not well-documented behavior. In that case, this behavior should be explicitly documented. > Different behavior of commitSync and commitAsync > ------------------------------------------------ > > Key: KAFKA-10576 > URL: https://issues.apache.org/jira/browse/KAFKA-10576 > Project: Kafka > Issue Type: Bug > Components: consumer > Reporter: Yuriy Badalyantc > Priority: Major > > It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a > different semantic. > {code:java} > public class TestKafka { > public static void main(String[]args) { > String id = "dev_test"; > Map<String, Object> settings = new HashMap<>(); > settings.put("bootstrap.servers", "localhost:9094"); > settings.put("key.deserializer", StringDeserializer.class); > settings.put("value.deserializer", StringDeserializer.class); > settings.put("client.id", id); > settings.put("group.id", id); > String topic = "test"; > Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); > offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1)); > try (KafkaConsumer<String, String> consumer = new > KafkaConsumer<>(settings)) { > consumer.commitSync(offsets); > } > } > } > {code} > In the example above I created a consumer and use {{commitSync}} to commit > offsets. This code works as expected — all offsets are committed to kafka. > But in the case of {{commitAsync}} it will not work: > {code:java} > try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) { > CompletableFuture<Boolean> result = new CompletableFuture<>(); > consumer.commitAsync(offsets, new OffsetCommitCallback() { > @Override > public void onComplete(Map<TopicPartition, OffsetAndMetadata> > offsets, Exception exception) { > if (exception != null) { > result.completeExceptionally(exception); > } else { > result.complete(true); > } > } > }); > result.get(15L, TimeUnit.SECONDS); > } > {code} > The {{result}} future failed with a timeout. > This behavior is pretty surprising. From naming and documentation, it looks > like {{commitSync}} and {{commitAsync}} methods should behave identically. Of > course, besides the blocking/non-blocking aspect. But in reality, there are > some differences. > I can assume that the {{commitAsync}} method somehow depends on the {{poll}} > calls. But I didn't find any explicit information about it in > {{KafkaConsumer}}'s javadoc or kafka documentation page. > So, I believe that there are the next options: > # It's a bug and not expected behavior. {{commitSync}} and {{commitAsync}} > should have identical semantics. > # It's expected, but not well-documented behavior. In that case, this > behavior should be explicitly documented. -- This message was sent by Atlassian Jira (v8.3.4#803005)