[ 
https://issues.apache.org/jira/browse/KAFKA-7096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16610096#comment-16610096
 ] 

ASF GitHub Bot commented on KAFKA-7096:
---------------------------------------

lindong28 closed pull request #5289: KAFKA-7096 : Clear buffered data for 
unassigned topicPartitions
URL: https://github.com/apache/kafka/pull/5289
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 4ea3cfd295f..4cdc4f862ec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -918,7 +918,7 @@ public void subscribe(Collection<String> topics, 
ConsumerRebalanceListener liste
                 }
 
                 throwIfNoAssignorsConfigured();
-
+                fetcher.clearBufferedDataForUnassignedTopics(topics);
                 log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", 
"));
                 this.subscriptions.subscribe(new HashSet<>(topics), listener);
                 metadata.setTopics(subscriptions.groupSubscription());
@@ -1019,10 +1019,11 @@ public void subscribe(Pattern pattern) {
     public void unsubscribe() {
         acquireAndEnsureOpen();
         try {
-            log.info("Unsubscribed all topics or patterns and assigned 
partitions");
+            
fetcher.clearBufferedDataForUnassignedPartitions(Collections.EMPTY_SET);
             this.subscriptions.unsubscribe();
             this.coordinator.maybeLeaveGroup();
             this.metadata.needMetadataForAllTopics(false);
+            log.info("Unsubscribed all topics or patterns and assigned 
partitions");
         } finally {
             release();
         }
@@ -1063,6 +1064,7 @@ public void assign(Collection<TopicPartition> partitions) 
{
                         throw new IllegalArgumentException("Topic partitions 
to assign to cannot have null or empty topic");
                     topics.add(topic);
                 }
+                fetcher.clearBufferedDataForUnassignedPartitions(partitions);
 
                 // make sure the offsets of topic partitions the consumer is 
unsubscribing from
                 // are committed since there will be no following rebalance
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index dc0daa233ab..a92f57e7347 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1015,6 +1015,40 @@ public void onAssignment(Set<TopicPartition> assignment) 
{
         sensors.updatePartitionLagAndLeadSensors(assignment);
     }
 
+    /**
+     * Clear the buffered data which are not a part of newly assigned 
partitions
+     *
+     * @param assignedPartitions  newly assigned {@link TopicPartition}
+     */
+    public void 
clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> 
assignedPartitions) {
+        Iterator<CompletedFetch> itr = completedFetches.iterator();
+        while (itr.hasNext()) {
+            TopicPartition tp = itr.next().partition;
+            if (!assignedPartitions.contains(tp)) {
+                itr.remove();
+            }
+        }
+        if (nextInLineRecords != null && 
!assignedPartitions.contains(nextInLineRecords.partition)) {
+            nextInLineRecords.drain();
+            nextInLineRecords = null;
+        }
+    }
+
+    /**
+     * Clear the buffered data which are not a part of newly assigned topics
+     *
+     * @param assignedTopics  newly assigned topics
+     */
+    public void clearBufferedDataForUnassignedTopics(Collection<String> 
assignedTopics) {
+        Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+        for (TopicPartition tp : subscriptions.assignedPartitions()) {
+            if (assignedTopics.contains(tp.topic())) {
+                currentTopicPartitions.add(tp);
+            }
+        }
+        clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
+    }
+
     public static Sensor throttleTimeSensor(Metrics metrics, 
FetcherMetricsRegistry metricsRegistry) {
         Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
         
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg),
 new Avg());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index d314a4d8c9f..afe5b2fa8de 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -287,6 +287,25 @@ public void testLeaderEpochInConsumerRecord() {
         }
     }
 
+    @Test
+    public void testClearBufferedDataForTopicPartitions() {
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tp0, this.records, 
Errors.NONE, 100L, 0));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        Set<TopicPartition> newAssignedTopicPartitions = new HashSet<>();
+        newAssignedTopicPartitions.add(tp1);
+
+        
fetcher.clearBufferedDataForUnassignedPartitions(newAssignedTopicPartitions);
+        assertFalse(fetcher.hasCompletedFetches());
+    }
+
     @Test
     public void testFetchSkipsBlackedOutNodes() {
         subscriptions.assignFromUser(singleton(tp0));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consumer should drop the data for unassigned topic partitions
> -------------------------------------------------------------
>
>                 Key: KAFKA-7096
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7096
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Mayuresh Gharat
>            Assignee: Mayuresh Gharat
>            Priority: Major
>             Fix For: 2.1.0
>
>
> currently if a client has assigned topics : T1, T2, T3 and calls poll(), the 
> poll might fetch data for partitions for all 3 topics T1, T2, T3. Now if the 
> client unassigns some topics (for example T3) and calls poll() we still hold 
> the data (for T3) in the completedFetches queue until we actually reach the 
> buffered data for the unassigned Topics (T3 in our example) on subsequent 
> poll() calls, at which point we drop that data. This process of holding the 
> data is unnecessary.
> When a client creates a topic, it takes time for the broker to fetch ACLs for 
> the topic. But during this time, the client will issue fetchRequest for the 
> topic, it will get response for the partitions of this topic. The response 
> consist of TopicAuthorizationException for each of the partitions. This 
> response for each partition is wrapped with a completedFetch and added to the 
> completedFetches queue. Now when the client calls the next poll() it sees the 
> TopicAuthorizationException from the first buffered CompletedFetch. At this 
> point the client chooses to sleep for 1.5 min as a backoff (as per the 
> design), hoping that the Broker fetches the ACL from ACL store in the 
> meantime. Actually the Broker has already fetched the ACL by this time. When 
> the client calls poll() after the sleep, it again sees the 
> TopicAuthorizationException from the second completedFetch and it sleeps 
> again. So it takes (1.5 * 60 * partitions) seconds before the client can see 
> any data. With this patch, the client when it sees the first 
> TopicAuthorizationException, it can all assign(EmptySet), which will get rid 
> of the buffered completedFetches (those with TopicAuthorizationException) and 
> it can again call assign(TopicPartitions) before calling poll(). With this 
> patch we found that client was able to get the records as soon as the Broker 
> fetched the ACLs from ACL store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to