[ https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kirk True updated KAFKA-15848: ------------------------------ Description: The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their use and interpretation of the {{Timer}} that is supplied. h3. tl;dr {{AsyncKafkaConsumer}} is very literal about the timeout, whereas {{LegacyKafkaConsumer}} seems to give a little wiggle room. {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for success of its operations _before_ checking the timer: # Submit operation asynchronously # Wait for operation to complete using {{NetworkClient.poll()}} # Check for result ## If successful, return success ## If fatal failure, return failure # Check timer ## If timer expired, return failure {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: # Submit operation asynchronously # Wait for operation to complete using {{Future.get()}} ## If operation timed out, {{Future.get()}} will throw a timeout error # Check for result ## If successful, return success ## Otherwise, return failure h3. How to reproduce This causes subtle timing issues, but they can be easily reproduced via any of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} API. Here's a bit of code that illustrates the difference between the two approaches. {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a manner similar to this: {code:keyword} public boolean doSomeWork(Timer timer) { do { final RequestFuture future = sendSomeRequest(partitions); client.poll(future, timer); if (future.isDone()) return {code} {color:#910091}true{color} {code:java} ; } while (timer.notExpired()); return false; } {code} {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: {code:keyword} private boolean doSomeWork(Timer timer) { Set<TopicPartition> partitions = subscriptions.initializingPartitions(); try { CompleteableApplicationEvent event = {code} {color:#910091}. . .{color} {code:java} CompletableFuture future = event.future(); applicationEventHandler.add(event); future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); return true; } catch (TimeoutException e) { return false; } } {code} The call to {{add}} enqueues the network operation, but it then _immediately_ invokes {{Future.get()}} with the timeout to implement a time-bounded blocking call. Since this method is being called with a timeout of 0, it _immediately_ throws a {{{}TimeoutException{}}}. h3. Suggested fix TBD :( was: The {{LegacyKafkaConsumer}} and {{AsyncKafkaConsumer}} implementations have a fundamental difference in their timing when it relates to their use of the {{Timer}} that is supplied. h3. tl;dr For time-bounded tasks, {{LegacyKafkaConsumer}} does the following: # Attempt the operation # If successful, return result # Check timer, return if expired # Go to top {{AsyncKafkaConsumer}} effectively does the inverse: # Check timer, return if expired # Attempt the operation # If successful, return result # Go to top {{AsyncKafkaConsumer}} is very literal about the timeout, whereas {{LegacyKafkaConsumer}} seems to give a little wiggle room. h3. How to reproduce This causes subtle timing issues, but they can be easily reproduced via the {{KafkaConsumerTest}} unit tests, e.g. {{verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit()}}. This test invokes {{consumer.poll(Duration.ofMillis(0))}}, i.e. with a zero-length timeout. As part of the {{poll()}} logic, the consumer may need to refresh offsets. To accomplish this, {{LegacyKafkaConsumer}} uses the {{ConsumerCoordinator.fetchCommittedOffsets()}} method which is structured like this: {code:java} public void fetchCommittedOffsets(Set<TopicPartition> partitions, Timer timer) { do { final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions); . . . client.poll(future, timer); . . . return future.value(); } while (timer.notExpired()); return null; } {code} The {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: {code:java} private boolean initWithCommittedOffsetsIfNeeded(Timer timer) { Set<TopicPartition> initializingPartitions = subscriptions.initializingPartitions(); try { OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(initializingPartitions); Map<TopicPartition, OffsetAndMetadata> offsets = applicationEventHandler.addAndGet(event, timer); . . . return true; } catch (TimeoutException e) { return false; } } {code} The call to {{addAndGet}} enqueues the operation on the queue for the network I/O thread and then _immediately_ invokes {{Future.get()}} with the timeout to implement a time-bounded blocking call. {{Future.get()}} will be passed the value of {{0}} (from the above call to {{poll(0)}}, and _immediately_ throw a {{TimeoutException}}. h3. Suggested fix TBD :( > Consumer API timeout inconsistent between ConsumerDelegate implementations > -------------------------------------------------------------------------- > > Key: KAFKA-15848 > URL: https://issues.apache.org/jira/browse/KAFKA-15848 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests > Reporter: Kirk True > Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > > The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and > {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their > use and interpretation of the {{Timer}} that is supplied. > h3. tl;dr > {{AsyncKafkaConsumer}} is very literal about the timeout, whereas > {{LegacyKafkaConsumer}} seems to give a little wiggle room. > {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for > success of its operations _before_ checking the timer: > # Submit operation asynchronously > # Wait for operation to complete using {{NetworkClient.poll()}} > # Check for result > ## If successful, return success > ## If fatal failure, return failure > # Check timer > ## If timer expired, return failure > {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: > # Submit operation asynchronously > # Wait for operation to complete using {{Future.get()}} > ## If operation timed out, {{Future.get()}} will throw a timeout error > # Check for result > ## If successful, return success > ## Otherwise, return failure > h3. How to reproduce > This causes subtle timing issues, but they can be easily reproduced via any > of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} > API. Here's a bit of code that illustrates the difference between the two > approaches. > {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a > manner similar to this: > {code:keyword} > public boolean doSomeWork(Timer timer) { > do { > final RequestFuture future = sendSomeRequest(partitions); > client.poll(future, timer); > if (future.isDone()) > return {code} > {color:#910091}true{color} > {code:java} > ; > } while (timer.notExpired()); > return false; > } > {code} > {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: > {code:keyword} > private boolean doSomeWork(Timer timer) { > Set<TopicPartition> partitions = subscriptions.initializingPartitions(); > try { > CompleteableApplicationEvent event = {code} > {color:#910091}. . .{color} > {code:java} > CompletableFuture future = event.future(); > applicationEventHandler.add(event); > future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); > return true; > } catch (TimeoutException e) { > return false; > } > } > {code} > The call to {{add}} enqueues the network operation, but it then _immediately_ > invokes {{Future.get()}} with the timeout to implement a time-bounded > blocking call. Since this method is being called with a timeout of 0, it > _immediately_ throws a {{{}TimeoutException{}}}. > h3. Suggested fix > TBD :( -- This message was sent by Atlassian Jira (v8.20.10#820010)