Re: [PR] KAFKA-15704: Update clientId and clientHost in MemberMetadata when static member is replaced. [kafka]
github-actions[bot] commented on PR #14666: URL: https://github.com/apache/kafka/pull/14666#issuecomment-2038787933 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15704: Update clientId and clientHost in MemberMetadata when static member is replaced. [kafka]
vamossagar12 commented on PR #14666: URL: https://github.com/apache/kafka/pull/14666#issuecomment-1878869619 hey @divijvaidya , @showuon would you be able to review this PR? This seems like a useful change and things look fine once I reviewed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15704: Update clientId and clientHost in MemberMetadata when static member is replaced. [kafka]
Lucent-Wong commented on PR #14666: URL: https://github.com/apache/kafka/pull/14666#issuecomment-1833071663 retest this please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15704: Update clientId and clientHost in MemberMetadata when static member is replaced. [kafka]
Lucent-Wong commented on code in PR #14666: URL: https://github.com/apache/kafka/pull/14666#discussion_r1400935789 ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala: ## @@ -3782,6 +3784,33 @@ class GroupCoordinatorTest { assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, () => true)) } + @Test + def testUpdateStaticMemberButStoreGroupFailed(): Unit = { +val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId, DefaultSessionTimeout, DefaultRebalanceTimeout, "oldClientId", "oldClientHost") +val oldGroup = getGroup(groupId) +val oldMemberId = oldGroup.currentStaticMemberId(followerInstanceId) +val oldMember = oldGroup.get(oldMemberId.get) +val oldClientId = oldMember.clientId +val oldClientHost = oldMember.clientHost + +val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, + followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1, appendRecordError = Errors.MESSAGE_TOO_LARGE) + +checkJoinGroupResult(joinGroupResult, + Errors.UNKNOWN_SERVER_ERROR, + rebalanceResult.generation, + Set.empty, + Stable, + Some(protocolType)) + +val newGroup = getGroup(groupId) Review Comment: OK, let change it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15704: Update clientId and clientHost in MemberMetadata when static member is replaced. [kafka]
vamossagar12 commented on code in PR #14666: URL: https://github.com/apache/kafka/pull/14666#discussion_r1400864801 ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala: ## @@ -3782,6 +3784,33 @@ class GroupCoordinatorTest { assertTrue(groupCoordinator.tryCompleteHeartbeat(group, leaderMemberId, false, () => true)) } + @Test + def testUpdateStaticMemberButStoreGroupFailed(): Unit = { +val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId, DefaultSessionTimeout, DefaultRebalanceTimeout, "oldClientId", "oldClientHost") +val oldGroup = getGroup(groupId) +val oldMemberId = oldGroup.currentStaticMemberId(followerInstanceId) +val oldMember = oldGroup.get(oldMemberId.get) +val oldClientId = oldMember.clientId +val oldClientHost = oldMember.clientHost + +val joinGroupResult = staticJoinGroupWithPersistence(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, + followerInstanceId, protocolType, protocolSuperset, clockAdvance = 1, appendRecordError = Errors.MESSAGE_TOO_LARGE) + +checkJoinGroupResult(joinGroupResult, + Errors.UNKNOWN_SERVER_ERROR, + rebalanceResult.generation, + Set.empty, + Stable, + Some(protocolType)) + +val newGroup = getGroup(groupId) Review Comment: nit: We don't need 2 separate variables for groupId. We can use the same `groupId` object. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15704: Update clientId and clientHost in MemberMetadata when static member is replaced. [kafka]
Lucent-Wong commented on PR #14666: URL: https://github.com/apache/kafka/pull/14666#issuecomment-1819365201 re-test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15704: Update clientId and clientHost in MemberMetadata when static member is replaced. [kafka]
Lucent-Wong commented on code in PR #14666: URL: https://github.com/apache/kafka/pull/14666#discussion_r1376291255 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -1335,7 +1339,7 @@ private[group] class GroupCoordinator( // Failed to persist member.id of the given static member, revert the update of the static member in the group. group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, oldSessionTimeoutMs, null) - val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId) + val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId, clientId, clientHost) Review Comment: I think you are correct, I didn't realize it should reset these value. I will update it. ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala: ## @@ -609,7 +609,7 @@ class GroupMetadataTest { @Test def testReplaceGroupInstanceWithNonExistingMember(): Unit = { val newMemberId = "newMemberId" -assertThrows(classOf[IllegalArgumentException], () => group.replaceStaticMember(groupInstanceId, memberId, newMemberId)) +assertThrows(classOf[IllegalArgumentException], () => group.replaceStaticMember(groupInstanceId, memberId, newMemberId, clientId, clientHost)) } @Test Review Comment: Sure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15704: Update clientId and clientHost in MemberMetadata when static member is replaced. [kafka]
vamossagar12 commented on code in PR #14666: URL: https://github.com/apache/kafka/pull/14666#discussion_r1375947390 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala: ## @@ -609,7 +609,7 @@ class GroupMetadataTest { @Test def testReplaceGroupInstanceWithNonExistingMember(): Unit = { val newMemberId = "newMemberId" -assertThrows(classOf[IllegalArgumentException], () => group.replaceStaticMember(groupInstanceId, memberId, newMemberId)) +assertThrows(classOf[IllegalArgumentException], () => group.replaceStaticMember(groupInstanceId, memberId, newMemberId, clientId, clientHost)) } @Test Review Comment: If you agree to my comment [here](https://github.com/apache/kafka/pull/14666/files#diff-1c062c229499ca8f215e0088c8ab0dfee768025cd3b59042132fddf70aef4c0cR1342), then can we add a test which validates that the clientId and clientHost gets reverted upon failure? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15704: Update clientId and clientHost in MemberMetadata when static member is replaced. [kafka]
vamossagar12 commented on code in PR #14666: URL: https://github.com/apache/kafka/pull/14666#discussion_r1375947390 ## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala: ## @@ -609,7 +609,7 @@ class GroupMetadataTest { @Test def testReplaceGroupInstanceWithNonExistingMember(): Unit = { val newMemberId = "newMemberId" -assertThrows(classOf[IllegalArgumentException], () => group.replaceStaticMember(groupInstanceId, memberId, newMemberId)) +assertThrows(classOf[IllegalArgumentException], () => group.replaceStaticMember(groupInstanceId, memberId, newMemberId, clientId, clientHost)) } @Test Review Comment: If you agree to my comment [here](https://github.com/apache/kafka/pull/14666/files#diff-1c062c229499ca8f215e0088c8ab0dfee768025cd3b59042132fddf70aef4c0cR1342), then can we add a test which validates that the clientId and clientHost gets reverted? ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -1335,7 +1339,7 @@ private[group] class GroupCoordinator( // Failed to persist member.id of the given static member, revert the update of the static member in the group. group.updateMember(knownStaticMember, oldProtocols, oldRebalanceTimeoutMs, oldSessionTimeoutMs, null) - val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId) + val oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId, clientId, clientHost) Review Comment: IIUC, `clientId` and `clientHost` are applicable for the replacing member. In this case, when the persistence of the new static member failed, we need to reset the client id and client host to the older one. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org