[
https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mayank Shekhar Narula updated KAFKA-15824:
------------------------------------------
Description:
As can be seen [here|#L453]], maybeValidatePositionForCurrentLeader doesn't
check if partition is subscribed by checking TopicPartitionState cached is null
or not, as done by
[maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]].
So it throws IllegalStateException for a partition that is yet not subscribed.
Lack of this check writing thread-safe code w.r.t SubscriptionState class is
awkward. This can be seen from the example code below. For example, at line 1
partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be
removed from subscribed partitions(in a separate thread). So this forces the
user of this class to handle IllegalStateException which is awkward.
{code:java}
// Following is example code for the user of
SubscriptionState::maybeValidatePositionForCurrentLeader
Set<TopicPartition> allCurrentlySubscribedTopics =
subscriptionState.assignedPartitions(); // line 1
if(allCurrentlySubscribedTopics.contains(tp)) {
ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(tp);
try() {
subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp,
leaderAndEpoch); // line 2
} catch (IllegalStateException e) {
// recover from it. // line 3
}
}{code}
was:
As can be seen [here|#L453]], maybeValidatePositionForCurrentLeader doesn't
check if partition is subscribed by checking TopicPartitionState cached is null
or not, as done by
[maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]].
So it throws IllegalStateException for a partition that is yet not subscribed.
Lack of this check prevents writing thread-safe code w.r.t SubscriptionState
class, this can be seen from the example code below. For example, at line 1
partA would be in allCurrentlySubscribedTopics, but at line 2 it could be
removed from subscribed partitions.
{code:java}
// Following is example code for the user of
SubscriptionState::maybeValidatePositionForCurrentLeader
Set<TopicPartition> allCurrentlySubscribedTopics =
subscriptionState.assignedPartitions(); // line 1
if(allCurrentlySubscribedTopics.contains(tp)) {
ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(tp);
try() {
subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp,
leaderAndEpoch); // line 2
} catch (IllegatonStateException e) {
// recover from it. // line 3
}
}{code}
> SubscriptionState's maybeValidatePositionForCurrentLeader should handle
> partition which isn't subscribed yet
> ------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-15824
> URL: https://issues.apache.org/jira/browse/KAFKA-15824
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Reporter: Mayank Shekhar Narula
> Assignee: Mayank Shekhar Narula
> Priority: Major
> Fix For: 3.7.0
>
>
> As can be seen [here|#L453]], maybeValidatePositionForCurrentLeader doesn't
> check if partition is subscribed by checking TopicPartitionState cached is
> null or not, as done by
> [maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]].
> So it throws IllegalStateException for a partition that is yet not
> subscribed.
> Lack of this check writing thread-safe code w.r.t SubscriptionState class is
> awkward. This can be seen from the example code below. For example, at line 1
> partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could
> be removed from subscribed partitions(in a separate thread). So this forces
> the user of this class to handle IllegalStateException which is awkward.
> {code:java}
> // Following is example code for the user of
> SubscriptionState::maybeValidatePositionForCurrentLeader
> Set<TopicPartition> allCurrentlySubscribedTopics =
> subscriptionState.assignedPartitions(); // line 1
> if(allCurrentlySubscribedTopics.contains(tp)) {
> ConsumerMetadata.LeaderAndEpoch leaderAndEpoch =
> metadata.currentLeader(tp);
> try() {
> subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp,
> leaderAndEpoch); // line 2
> } catch (IllegalStateException e) {
> // recover from it. // line 3
> }
> }{code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)