[ https://issues.apache.org/jira/browse/KAFKA-10576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17211577#comment-17211577 ]
huxihx commented on KAFKA-10576: -------------------------------- `commitAsync` does need subsequent `poll` calls to take effect, so I prefer option #2 here. > Different behavior of commitSync and commitAsync > ------------------------------------------------ > > Key: KAFKA-10576 > URL: https://issues.apache.org/jira/browse/KAFKA-10576 > Project: Kafka > Issue Type: Bug > Components: consumer > Reporter: Yuriy Badalyantc > Priority: Major > > It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a > different semantic. > {code:java} > public class TestKafka { > public static void main(String[]args) { > String id = "dev_test"; > Map<String, Object> settings = new HashMap<>(); > settings.put("bootstrap.servers", "localhost:9094"); > settings.put("key.deserializer", StringDeserializer.class); > settings.put("value.deserializer", StringDeserializer.class); > settings.put("client.id", id); > settings.put("group.id", id); > String topic = "test"; > Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); > offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1)); > try (KafkaConsumer<String, String> consumer = new > KafkaConsumer<>(settings)) { > consumer.commitSync(offsets); > } > } > } > {code} > In the example above I created a consumer and use {{commitSync}} to commit > offsets. This code works as expected — all offsets are committed to kafka. > But in the case of {{commitAsync}} it will not work: > {code:java} > try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) { > CompletableFuture<Boolean> result = new CompletableFuture<>(); > consumer.commitAsync(offsets, new OffsetCommitCallback() { > @Override > public void onComplete(Map<TopicPartition, OffsetAndMetadata> > offsets, Exception exception) { > if (exception != null) { > result.completeExceptionally(exception); > } else { > result.complete(true); > } > } > }); > result.get(15L, TimeUnit.SECONDS); > } > {code} > The {{result}} future failed with a timeout. > This behavior is pretty surprising. From naming and documentation, it looks > like {{commitSync}} and {{commitAsync}} methods should behave identically. Of > course, besides the blocking/non-blocking aspect. But in reality, there are > some differences. > I can assume that the {{commitAsync}} method somehow depends on the {{poll}} > calls. But I didn't find any explicit information about it in > {{KafkaConsumer}}'s javadoc or kafka documentation page. > So, I believe that there are the next options: > # It's a bug and not expected behavior. {{commitSync}} and {{commitAsync}} > should have identical semantics. > # It's expected, but not well-documented behavior. In that case, this > behavior should be explicitly documented. -- This message was sent by Atlassian Jira (v8.3.4#803005)