lucasbru commented on code in PR #22559:
URL: https://github.com/apache/kafka/pull/22559#discussion_r3418952097


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java:
##########
@@ -1542,6 +1542,56 @@ public void testOnHeartbeatSuccessWhenInLeaving() {
         verify(memberStateListener, 
never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH + 1), 
membershipManager.memberId());
     }
 
+    @Test
+    public void testStaticMemberRemainInGroupUsesStaticLeaveEpochOnClose() {
+        CloseOptions.GroupMembershipOperation operation = 
CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
+        MemberState expectedState = MemberState.LEAVING;
+        int expectedEpoch = 
StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+        verifyStaticMemberLeaveOnClose(operation, expectedState, 
expectedEpoch);
+    }
+
+    @Test
+    public void 
testStaticMemberDefaultUsesLeaveGroupStaticMemberEpochOnClose() {
+        CloseOptions.GroupMembershipOperation operation = 
CloseOptions.GroupMembershipOperation.DEFAULT;
+        MemberState expectedState = MemberState.LEAVING;
+        int expectedEpoch = 
StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+        verifyStaticMemberLeaveOnClose(operation, expectedState, 
expectedEpoch);
+    }
+
+    @Test
+    public void testStaticMemberLeaveGroupUsesLeaveGroupEpochOnClose() {
+        CloseOptions.GroupMembershipOperation operation = 
CloseOptions.GroupMembershipOperation.LEAVE_GROUP;
+        MemberState expectedState = MemberState.LEAVING;
+        int expectedEpoch = 
StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        verifyStaticMemberLeaveOnClose(operation, expectedState, 
expectedEpoch);
+    }
+
+    private void verifyStaticMemberLeaveOnClose(
+        CloseOptions.GroupMembershipOperation membershipOperation,
+        MemberState expectedMemberState,
+        int expectedMemberEpoch
+    ) {
+        final Metrics localMetrics = new Metrics(time);
+        StreamsMembershipManager membershipManagerWithStaticMember = new 
StreamsMembershipManager(
+            GROUP_ID,
+            Optional.of("instance-1"),

Review Comment:
   Minor: this hardcodes "instance-1" while 
StreamsGroupHeartbeatRequestManagerTest uses an INSTANCE_ID constant. Worth a 
constant here too for consistency.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java:
##########
@@ -1542,6 +1542,56 @@ public void testOnHeartbeatSuccessWhenInLeaving() {
         verify(memberStateListener, 
never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH + 1), 
membershipManager.memberId());
     }
 
+    @Test
+    public void testStaticMemberRemainInGroupUsesStaticLeaveEpochOnClose() {
+        CloseOptions.GroupMembershipOperation operation = 
CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
+        MemberState expectedState = MemberState.LEAVING;
+        int expectedEpoch = 
StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+        verifyStaticMemberLeaveOnClose(operation, expectedState, 
expectedEpoch);
+    }
+
+    @Test
+    public void 
testStaticMemberDefaultUsesLeaveGroupStaticMemberEpochOnClose() {
+        CloseOptions.GroupMembershipOperation operation = 
CloseOptions.GroupMembershipOperation.DEFAULT;
+        MemberState expectedState = MemberState.LEAVING;
+        int expectedEpoch = 
StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+        verifyStaticMemberLeaveOnClose(operation, expectedState, 
expectedEpoch);
+    }
+
+    @Test
+    public void testStaticMemberLeaveGroupUsesLeaveGroupEpochOnClose() {
+        CloseOptions.GroupMembershipOperation operation = 
CloseOptions.GroupMembershipOperation.LEAVE_GROUP;
+        MemberState expectedState = MemberState.LEAVING;
+        int expectedEpoch = 
StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        verifyStaticMemberLeaveOnClose(operation, expectedState, 
expectedEpoch);
+    }

Review Comment:
   These three only differ by the operation and expected epoch, could they be a 
single @ParameterizedTest like testPollOnCloseWhenStaticMemberIsLeaving above? 
Also the names are inconsistent for the same constant: `UsesStaticLeaveEpoch` 
vs `UsesLeaveGroupStaticMemberEpoch`.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -1493,6 +1495,30 @@ public void testPollOnCloseWhenIsLeaving() {
         assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
streamsRequest.data().memberEpoch());
     }
 
+    @ParameterizedTest
+    @EnumSource(value = CloseOptions.GroupMembershipOperation.class, names = 
{"DEFAULT", "REMAIN_IN_GROUP"})
+    public void testPollOnCloseWhenStaticMemberIsLeaving(final 
CloseOptions.GroupMembershipOperation operation) {

Review Comment:
   Coverage here is unit-level only. Is there an integration/system test 
exercising static-member close under group.protocol=streams, or is that 
expected to come separately?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1600,12 +1600,6 @@ protected StreamsConfig(final Map<?, ?> props,
 
     private void verifyStreamsProtocolCompatibility(final boolean doLog) {
         if (doLog && isStreamsProtocolEnabled()) {

Review Comment:
   Now that group.instance.id is accepted with the streams protocol, are there 
docs that still state static membership isn't supported here? Worth checking 
the Streams upgrade/config docs so they don't contradict this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to