[ https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kirk True updated KAFKA-15848: ------------------------------ Description: The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their use and interpretation of the {{Timer}} that is supplied. h3. tl;dr {{AsyncKafkaConsumer}} is very literal about the timeout, whereas {{LegacyKafkaConsumer}} seems to give a little wiggle room. {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for success of its operations _before_ checking the timer: # Submit operation asynchronously # Wait for operation to complete using {{NetworkClient.poll()}} # Check for result ## If successful, return success ## If fatal failure, return failure # Check timer ## If timer expired, return failure {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: # Submit operation asynchronously # Wait for operation to complete using {{Future.get()}} ## If operation timed out, {{Future.get()}} will throw a timeout error # Check for result ## If successful, return success ## Otherwise, return failure h3. How to reproduce This causes subtle timing issues, but they can be easily reproduced via any of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} API. Here's a bit of code that illustrates the difference between the two approaches. {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a manner similar to this: {code:java} public int getCount(Timer timer) { do { final RequestFuture<Integer> future = sendSomeRequest(partitions); client.poll(future, timer); if (future.isDone()) return future.get(); } while (timer.notExpired()); return -1; } {code} {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: {code:java} private int getCount(Timer timer) { try { CompletableFuture<Integer> future = new CompleteableFuture<>(); applicationEventQueue.add(future); return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { return -1; } } {code} The call to {{add}} enqueues the network operation, but it then _immediately_ invokes {{Future.get()}} with the timeout to implement a time-bounded blocking call. Since this method is being called with a timeout of 0, it _immediately_ throws a {{{}TimeoutException{}}}. h3. Suggested fix This task is to design and document the timeout policy for the new Consumer implementation. The documentation lives here: [https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts] was: The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their use and interpretation of the {{Timer}} that is supplied. h3. tl;dr {{AsyncKafkaConsumer}} is very literal about the timeout, whereas {{LegacyKafkaConsumer}} seems to give a little wiggle room. {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for success of its operations _before_ checking the timer: # Submit operation asynchronously # Wait for operation to complete using {{NetworkClient.poll()}} # Check for result ## If successful, return success ## If fatal failure, return failure # Check timer ## If timer expired, return failure {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: # Submit operation asynchronously # Wait for operation to complete using {{Future.get()}} ## If operation timed out, {{Future.get()}} will throw a timeout error # Check for result ## If successful, return success ## Otherwise, return failure h3. How to reproduce This causes subtle timing issues, but they can be easily reproduced via any of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} API. Here's a bit of code that illustrates the difference between the two approaches. {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a manner similar to this: {code:java} public int getCount(Timer timer) { do { final RequestFuture<Integer> future = sendSomeRequest(partitions); client.poll(future, timer); if (future.isDone()) return future.get(); } while (timer.notExpired()); return -1; } {code} {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: {code:java} private int getCount(Timer timer) { try { CompletableFuture<Integer> future = new CompleteableFuture<>(); applicationEventQueue.add(future); return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { return -1; } } {code} The call to {{add}} enqueues the network operation, but it then _immediately_ invokes {{Future.get()}} with the timeout to implement a time-bounded blocking call. Since this method is being called with a timeout of 0, it _immediately_ throws a {{{}TimeoutException{}}}. h3. Suggested fix TBD :( > Consumer API timeout inconsistent between ConsumerDelegate implementations > -------------------------------------------------------------------------- > > Key: KAFKA-15848 > URL: https://issues.apache.org/jira/browse/KAFKA-15848 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Reporter: Kirk True > Assignee: Kirk True > Priority: Blocker > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and > {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their > use and interpretation of the {{Timer}} that is supplied. > h3. tl;dr > {{AsyncKafkaConsumer}} is very literal about the timeout, whereas > {{LegacyKafkaConsumer}} seems to give a little wiggle room. > {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for > success of its operations _before_ checking the timer: > # Submit operation asynchronously > # Wait for operation to complete using {{NetworkClient.poll()}} > # Check for result > ## If successful, return success > ## If fatal failure, return failure > # Check timer > ## If timer expired, return failure > {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations: > # Submit operation asynchronously > # Wait for operation to complete using {{Future.get()}} > ## If operation timed out, {{Future.get()}} will throw a timeout error > # Check for result > ## If successful, return success > ## Otherwise, return failure > h3. How to reproduce > This causes subtle timing issues, but they can be easily reproduced via any > of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} > API. Here's a bit of code that illustrates the difference between the two > approaches. > {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a > manner similar to this: > {code:java} > public int getCount(Timer timer) { > do { > final RequestFuture<Integer> future = sendSomeRequest(partitions); > client.poll(future, timer); > if (future.isDone()) > return future.get(); > } while (timer.notExpired()); > return -1; > } > {code} > {{AsyncKafkaConsumer}} has similar logic, but it is structured like this: > {code:java} > private int getCount(Timer timer) { > try { > CompletableFuture<Integer> future = new CompleteableFuture<>(); > applicationEventQueue.add(future); > return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); > } catch (TimeoutException e) { > return -1; > } > } > {code} > The call to {{add}} enqueues the network operation, but it then _immediately_ > invokes {{Future.get()}} with the timeout to implement a time-bounded > blocking call. Since this method is being called with a timeout of 0, it > _immediately_ throws a {{{}TimeoutException{}}}. > h3. Suggested fix > This task is to design and document the timeout policy for the new Consumer > implementation. > The documentation lives here: > [https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts] -- This message was sent by Atlassian Jira (v8.20.10#820010)