[ 
https://issues.apache.org/jira/browse/KAFKA-10576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17211577#comment-17211577
 ] 

huxihx commented on KAFKA-10576:
--------------------------------

`commitAsync` does need subsequent `poll` calls to take effect, so I prefer 
option #2 here. 

> Different behavior of commitSync and commitAsync
> ------------------------------------------------
>
>                 Key: KAFKA-10576
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10576
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>            Reporter: Yuriy Badalyantc
>            Priority: Major
>
> It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a 
> different semantic.
> {code:java}
> public class TestKafka {
>     public static void main(String[]args) {
>         String id = "dev_test";
>         Map<String, Object> settings = new HashMap<>();
>         settings.put("bootstrap.servers", "localhost:9094");
>         settings.put("key.deserializer", StringDeserializer.class);
>         settings.put("value.deserializer", StringDeserializer.class);
>         settings.put("client.id", id);
>         settings.put("group.id", id);
>         String topic = "test";
>         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
>         offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1));
>         try (KafkaConsumer<String, String> consumer = new 
> KafkaConsumer<>(settings)) {
>             consumer.commitSync(offsets);
>         }
>     }
> }
> {code}
> In the example above I created a consumer and use {{commitSync}} to commit 
> offsets. This code works as expected — all offsets are committed to kafka.
> But in the case of {{commitAsync}} it will not work:
> {code:java}
> try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
>     CompletableFuture<Boolean> result = new CompletableFuture<>();
>     consumer.commitAsync(offsets, new OffsetCommitCallback() {
>         @Override
>         public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
> offsets, Exception exception) {
>             if (exception != null) {
>                 result.completeExceptionally(exception);
>             } else {
>                 result.complete(true);
>             }
>         }
>     });
>     result.get(15L, TimeUnit.SECONDS);
> }
> {code}
> The {{result}} future failed with a timeout.
> This behavior is pretty surprising. From naming and documentation, it looks 
> like {{commitSync}} and {{commitAsync}} methods should behave identically. Of 
> course, besides the blocking/non-blocking aspect. But in reality, there are 
> some differences.
> I can assume that the {{commitAsync}} method somehow depends on the {{poll}} 
> calls. But I didn't find any explicit information about it in 
> {{KafkaConsumer}}'s javadoc or kafka documentation page.
> So, I believe that there are the next options:
>  # It's a bug and not expected behavior. {{commitSync}} and {{commitAsync}} 
> should have identical semantics.
>  # It's expected, but not well-documented behavior. In that case, this 
> behavior should be explicitly documented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to