[ 
https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15848:
------------------------------
    Description: 
The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and 
{{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their use 
and interpretation of the {{Timer}} that is supplied.
h3. tl;dr

{{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
{{LegacyKafkaConsumer}} seems to give a little wiggle room.

{{LegacyKafkaConsumer}} is structured so that the logic it uses can check for 
success of its operations _before_ checking the timer:
 # Submit operation asynchronously
 # Wait for operation to complete using {{NetworkClient.poll()}}
 # Check for result
 ## If successful, return success
 ## If fatal failure, return failure
 # Check timer
 ## If timer expired, return failure

{{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations:
 # Submit operation asynchronously
 # Wait for operation to complete using {{Future.get()}}
 ## If operation timed out, {{Future.get()}} will throw a timeout error
 # Check for result
 ## If successful, return success
 ## Otherwise, return failure

h3. How to reproduce

This causes subtle timing issues, but they can be easily reproduced via any of 
the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} API. 
Here's a bit of code that illustrates the difference between the two approaches.

{{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a 
manner similar to this:
{code:keyword}
public boolean doSomeWork(Timer timer) {
    do {
        final RequestFuture future = sendSomeRequest(partitions);
        client.poll(future, timer);

        if (future.isDone())
            return {code}
{color:#910091}true{color}
{code:java}
;
    } while (timer.notExpired());

    return false;
}
{code}
{{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
{code:keyword}
private boolean doSomeWork(Timer timer) {
    Set<TopicPartition> partitions = subscriptions.initializingPartitions();

    try {
        CompleteableApplicationEvent event = {code}
{color:#910091}. . .{color}
{code:java}

        CompletableFuture future = event.future();
        applicationEventHandler.add(event);
        future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
        return true;
    } catch (TimeoutException e) {
        return false;
    }
}
{code}
The call to {{add}} enqueues the network operation, but it then _immediately_ 
invokes {{Future.get()}} with the timeout to implement a time-bounded blocking 
call. Since this method is being called with a timeout of 0, it _immediately_ 
throws a {{{}TimeoutException{}}}. 
h3. Suggested fix

TBD :(

  was:
The {{LegacyKafkaConsumer}} and {{AsyncKafkaConsumer}} implementations have a 
fundamental difference in their timing when it relates to their use of the 
{{Timer}} that is supplied.

h3. tl;dr

For time-bounded tasks, {{LegacyKafkaConsumer}} does the following:

# Attempt the operation
# If successful, return result
# Check timer, return if expired
# Go to top

{{AsyncKafkaConsumer}} effectively does the inverse:

# Check timer, return if expired
# Attempt the operation
# If successful, return result
# Go to top

{{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
{{LegacyKafkaConsumer}} seems to give a little wiggle room.

h3. How to reproduce

This causes subtle timing issues, but they can be easily reproduced via the 
{{KafkaConsumerTest}} unit tests, e.g. 
{{verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit()}}. This test 
invokes {{consumer.poll(Duration.ofMillis(0))}}, i.e. with a zero-length 
timeout.

As part of the {{poll()}} logic, the consumer may need to refresh offsets. To 
accomplish this, {{LegacyKafkaConsumer}} uses the 
{{ConsumerCoordinator.fetchCommittedOffsets()}} method which is structured like 
this:

{code:java}
public void fetchCommittedOffsets(Set<TopicPartition> partitions, Timer timer) {
    do {
        final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
sendOffsetFetchRequest(partitions);
        . . .
        client.poll(future, timer);
        . . .
        return future.value();
    } while (timer.notExpired());

    return null;
}
{code}

The {{AsyncKafkaConsumer}} has similar logic, but it is structured like this:

{code:java}
private boolean initWithCommittedOffsetsIfNeeded(Timer timer) {
    Set<TopicPartition> initializingPartitions = 
subscriptions.initializingPartitions();

    try {
        OffsetFetchApplicationEvent event = new 
OffsetFetchApplicationEvent(initializingPartitions);
        Map<TopicPartition, OffsetAndMetadata> offsets = 
applicationEventHandler.addAndGet(event, timer);
        . . .
        return true;
    } catch (TimeoutException e) {
        return false;
    }
}
{code}

The call to {{addAndGet}} enqueues the operation on the queue for the network 
I/O thread and then _immediately_ invokes {{Future.get()}} with the timeout to 
implement a time-bounded blocking call. {{Future.get()}} will be passed the 
value of {{0}} (from the above call to {{poll(0)}}, and _immediately_ throw a 
{{TimeoutException}}.

h3. Suggested fix

TBD :(


> Consumer API timeout inconsistent between ConsumerDelegate implementations
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-15848
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15848
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer, unit tests
>            Reporter: Kirk True
>            Priority: Major
>              Labels: consumer-threading-refactor, kip-848-client-support, 
> kip-848-preview
>
> The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and 
> {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their 
> use and interpretation of the {{Timer}} that is supplied.
> h3. tl;dr
> {{AsyncKafkaConsumer}} is very literal about the timeout, whereas 
> {{LegacyKafkaConsumer}} seems to give a little wiggle room.
> {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for 
> success of its operations _before_ checking the timer:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{NetworkClient.poll()}}
>  # Check for result
>  ## If successful, return success
>  ## If fatal failure, return failure
>  # Check timer
>  ## If timer expired, return failure
> {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations:
>  # Submit operation asynchronously
>  # Wait for operation to complete using {{Future.get()}}
>  ## If operation timed out, {{Future.get()}} will throw a timeout error
>  # Check for result
>  ## If successful, return success
>  ## Otherwise, return failure
> h3. How to reproduce
> This causes subtle timing issues, but they can be easily reproduced via any 
> of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}} 
> API. Here's a bit of code that illustrates the difference between the two 
> approaches.
> {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a 
> manner similar to this:
> {code:keyword}
> public boolean doSomeWork(Timer timer) {
>     do {
>         final RequestFuture future = sendSomeRequest(partitions);
>         client.poll(future, timer);
>         if (future.isDone())
>             return {code}
> {color:#910091}true{color}
> {code:java}
> ;
>     } while (timer.notExpired());
>     return false;
> }
> {code}
> {{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
> {code:keyword}
> private boolean doSomeWork(Timer timer) {
>     Set<TopicPartition> partitions = subscriptions.initializingPartitions();
>     try {
>         CompleteableApplicationEvent event = {code}
> {color:#910091}. . .{color}
> {code:java}
>         CompletableFuture future = event.future();
>         applicationEventHandler.add(event);
>         future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
>         return true;
>     } catch (TimeoutException e) {
>         return false;
>     }
> }
> {code}
> The call to {{add}} enqueues the network operation, but it then _immediately_ 
> invokes {{Future.get()}} with the timeout to implement a time-bounded 
> blocking call. Since this method is being called with a timeout of 0, it 
> _immediately_ throws a {{{}TimeoutException{}}}. 
> h3. Suggested fix
> TBD :(



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to