Mayuresh Gharat created KAFKA-7096:
--------------------------------------

             Summary: Consumer should drop the data for unassigned topic 
partitions
                 Key: KAFKA-7096
                 URL: https://issues.apache.org/jira/browse/KAFKA-7096
             Project: Kafka
          Issue Type: Improvement
            Reporter: Mayuresh Gharat
            Assignee: Mayuresh Gharat


currently if a client has assigned topics : T1, T2, T3 and calls poll(), the 
poll might fetch data for partitions for all 3 topics T1, T2, T3. Now if the 
client unassigns some topics (for example T3) and calls poll() we still hold 
the data (for T3) in the completedFetches queue until we actually reach the 
buffered data for the unassigned Topics (T3 in our example) on subsequent 
poll() calls, at which point we drop that data. This process of holding the 
data is unnecessary.

When a client creates a topic, it takes time for the broker to fetch ACLs for 
the topic. But during this time, the client will issue fetchRequest for the 
topic, it will get response for the partitions of this topic. The response 
consist of TopicAuthorizationException for each of the partitions. This 
response for each partition is wrapped with a completedFetch and added to the 
completedFetches queue. Now when the client calls the next poll() it sees the 
TopicAuthorizationException from the first buffered CompletedFetch. At this 
point the client chooses to sleep for 1.5 min as a backoff (as per the design), 
hoping that the Broker fetches the ACL from ACL store in the meantime. Actually 
the Broker has already fetched the ACL by this time. When the client calls 
poll() after the sleep, it again sees the TopicAuthorizationException from the 
second completedFetch and it sleeps again. So it takes (1.5 * 60 * partitions) 
seconds before the client can see any data. With this patch, the client when it 
sees the first TopicAuthorizationException, it can all assign(EmptySet), which 
will get rid of the buffered completedFetches (those with 
TopicAuthorizationException) and it can again call assign(TopicPartitions) 
before calling poll(). With this patch we found that client was able to get the 
records as soon as the Broker fetched the ACLs from ACL store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to