lucasbru commented on code in PR #21110:
URL: https://github.com/apache/kafka/pull/21110#discussion_r2605780943


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java:
##########
@@ -1937,6 +1938,242 @@ public void 
testConsumerPollWhenSubscriptionNotUpdated() {
         verifyInStateUnsubscribed(membershipManager);
     }
 
+    @Test
+    public void testIsGroupReadyWithMissingSourceTopicsStatus() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+        joining();
+
+        final List<StreamsGroupHeartbeatResponseData.Status> statuses = 
List.of(
+            new StreamsGroupHeartbeatResponseData.Status()
+                
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
+                .setStatusDetail("")
+        );
+
+        final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            MEMBER_EPOCH,
+            statuses
+        );
+
+        membershipManager.onHeartbeatSuccess(response);
+        membershipManager.poll(time.milliseconds());
+
+        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+            Set.of(),
+            Set.of(),
+            Set.of(),
+            false
+        );
+
+        future.complete(null);
+    }
+
+    @Test
+    public void testIsGroupReadyWithMissingInternalTopicsStatus() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+        joining();
+
+        final List<StreamsGroupHeartbeatResponseData.Status> statuses = 
List.of(
+            new StreamsGroupHeartbeatResponseData.Status()
+                
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+                .setStatusDetail("")
+        );
+
+        final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            MEMBER_EPOCH,
+            statuses
+        );
+
+        membershipManager.onHeartbeatSuccess(response);
+        membershipManager.poll(time.milliseconds());
+
+        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+            Set.of(),
+            Set.of(),
+            Set.of(),
+            false
+        );
+
+        future.complete(null);
+    }
+
+    @Test
+    public void testIsGroupReadyWithIncorrectlyPartitionedTopicsStatus() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+        joining();
+
+        final List<StreamsGroupHeartbeatResponseData.Status> statuses = 
List.of(
+            new StreamsGroupHeartbeatResponseData.Status()
+                
.setStatusCode(StreamsGroupHeartbeatResponse.Status.INCORRECTLY_PARTITIONED_TOPICS.code())
+                .setStatusDetail("")
+        );
+
+        final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            MEMBER_EPOCH,
+            statuses
+        );
+
+        membershipManager.onHeartbeatSuccess(response);
+        membershipManager.poll(time.milliseconds());
+
+        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+            Set.of(),
+            Set.of(),
+            Set.of(),
+            false
+        );
+
+        future.complete(null);
+    }
+
+    @Test
+    public void testIsGroupReadyWithAssignmentDelayedStatus() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+        joining();
+
+        final List<StreamsGroupHeartbeatResponseData.Status> statuses = 
List.of(
+            new StreamsGroupHeartbeatResponseData.Status()
+                
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
+                .setStatusDetail("")
+        );
+
+        final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            MEMBER_EPOCH,
+            statuses
+        );
+
+        membershipManager.onHeartbeatSuccess(response);
+        membershipManager.poll(time.milliseconds());
+
+        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+            Set.of(),
+            Set.of(),
+            Set.of(),
+            false
+        );
+
+        future.complete(null);
+    }
+
+    @Test
+    public void testIsGroupReadyWithNoStatuses() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+        joining();
+
+        final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            MEMBER_EPOCH,
+            null
+        );
+
+        membershipManager.onHeartbeatSuccess(response);
+        membershipManager.poll(time.milliseconds());
+
+        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+            Set.of(),
+            Set.of(),
+            Set.of(),
+            true
+        );
+
+        future.complete(null);
+    }
+
+    @Test
+    public void testIsGroupReadyWithOtherStatuses() {
+        
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, 
TOPIC_0);
+        joining();
+
+        final List<StreamsGroupHeartbeatResponseData.Status> statuses = 
List.of(
+            new StreamsGroupHeartbeatResponseData.Status()
+                
.setStatusCode(StreamsGroupHeartbeatResponse.Status.STALE_TOPOLOGY.code())
+                .setStatusDetail("")
+        );
+
+        final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+            Collections.emptyList(),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            MEMBER_EPOCH,
+            statuses
+        );
+
+        membershipManager.onHeartbeatSuccess(response);
+        membershipManager.poll(time.milliseconds());
+
+        final CompletableFuture<Void> future = 
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+            Set.of(),
+            Set.of(),
+            Set.of(),
+            true

Review Comment:
   So this only plays a role once we implement topology updating. But the way 
it's defined in KIP-1071, a member with a stale topology gets to keep it's 
tasks and can continue processing, just doesn't receive new tasks. If 
immediately stop processing on all nodes, rolling bounces will become 
impossible. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to