mjsax commented on code in PR #21110:
URL: https://github.com/apache/kafka/pull/21110#discussion_r2604703077
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java:
##########
@@ -1937,6 +1938,242 @@ public void
testConsumerPollWhenSubscriptionNotUpdated() {
verifyInStateUnsubscribed(membershipManager);
}
+ @Test
+ public void testIsGroupReadyWithMissingSourceTopicsStatus() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+ joining();
+
+ final List<StreamsGroupHeartbeatResponseData.Status> statuses =
List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
+ .setStatusDetail("")
+ );
+
+ final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ MEMBER_EPOCH,
+ statuses
+ );
+
+ membershipManager.onHeartbeatSuccess(response);
+ membershipManager.poll(time.milliseconds());
+
+ final CompletableFuture<Void> future =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+ Set.of(),
+ Set.of(),
+ Set.of(),
+ false
+ );
+
+ future.complete(null);
+ }
+
+ @Test
+ public void testIsGroupReadyWithMissingInternalTopicsStatus() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+ joining();
+
+ final List<StreamsGroupHeartbeatResponseData.Status> statuses =
List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
+ .setStatusDetail("")
+ );
+
+ final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ MEMBER_EPOCH,
+ statuses
+ );
+
+ membershipManager.onHeartbeatSuccess(response);
+ membershipManager.poll(time.milliseconds());
+
+ final CompletableFuture<Void> future =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+ Set.of(),
+ Set.of(),
+ Set.of(),
+ false
+ );
+
+ future.complete(null);
+ }
+
+ @Test
+ public void testIsGroupReadyWithIncorrectlyPartitionedTopicsStatus() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+ joining();
+
+ final List<StreamsGroupHeartbeatResponseData.Status> statuses =
List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.INCORRECTLY_PARTITIONED_TOPICS.code())
+ .setStatusDetail("")
+ );
+
+ final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ MEMBER_EPOCH,
+ statuses
+ );
+
+ membershipManager.onHeartbeatSuccess(response);
+ membershipManager.poll(time.milliseconds());
+
+ final CompletableFuture<Void> future =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+ Set.of(),
+ Set.of(),
+ Set.of(),
+ false
+ );
+
+ future.complete(null);
+ }
+
+ @Test
+ public void testIsGroupReadyWithAssignmentDelayedStatus() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+ joining();
+
+ final List<StreamsGroupHeartbeatResponseData.Status> statuses =
List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
+ .setStatusDetail("")
+ );
+
+ final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ MEMBER_EPOCH,
+ statuses
+ );
+
+ membershipManager.onHeartbeatSuccess(response);
+ membershipManager.poll(time.milliseconds());
+
+ final CompletableFuture<Void> future =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+ Set.of(),
+ Set.of(),
+ Set.of(),
+ false
+ );
+
+ future.complete(null);
+ }
+
+ @Test
+ public void testIsGroupReadyWithNoStatuses() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+ joining();
+
+ final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ MEMBER_EPOCH,
+ null
+ );
+
+ membershipManager.onHeartbeatSuccess(response);
+ membershipManager.poll(time.milliseconds());
+
+ final CompletableFuture<Void> future =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+ Set.of(),
+ Set.of(),
+ Set.of(),
+ true
+ );
+
+ future.complete(null);
+ }
+
+ @Test
+ public void testIsGroupReadyWithOtherStatuses() {
+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0,
TOPIC_0);
+ joining();
+
+ final List<StreamsGroupHeartbeatResponseData.Status> statuses =
List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.STALE_TOPOLOGY.code())
+ .setStatusDetail("")
+ );
+
+ final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ MEMBER_EPOCH,
+ statuses
+ );
+
+ membershipManager.onHeartbeatSuccess(response);
+ membershipManager.poll(time.milliseconds());
+
+ final CompletableFuture<Void> future =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(
+ Set.of(),
+ Set.of(),
+ Set.of(),
+ true
Review Comment:
I am surprised to see `true` here? Why is a group ready if the topology is
stale?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]