This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e981b82 KAFKA-8500; Static member rejoin should always update member.id (#6899) e981b82 is described below commit e981b82601e9967d3a2492d2cebb45369407b398 Author: Boyang Chen <boy...@confluent.io> AuthorDate: Wed Jun 12 08:41:58 2019 -0700 KAFKA-8500; Static member rejoin should always update member.id (#6899) This PR fixes a bug in static group membership. Previously we limit the `member.id` replacement in JoinGroup to only cases when the group is in Stable. This is error-prone and could potentially allow duplicate consumers reading from the same topic. For example, imagine a case where two unknown members join in the `PrepareRebalance` stage at the same time. The PR fixes the following things: 1. Replace `member.id` at any time we see a known static member rejoins group with unknown member.id 2. Immediately fence any ongoing join/sync group callback to early terminate the duplicate member. 3. Clearly handle Dead/Empty cases as exceptional. 4. Return old leader id upon static member leader rejoin to avoid trivial member assignment being triggered. Reviewers: Guozhang Wang <wangg...@gmail.com>, Jason Gustafson <ja...@confluent.io> --- .../kafka/coordinator/group/GroupCoordinator.scala | 110 +++++++------ .../kafka/coordinator/group/GroupMetadata.scala | 34 +++- .../kafka/coordinator/group/MemberMetadata.scala | 3 +- core/src/main/scala/kafka/server/KafkaApis.scala | 12 +- .../group/GroupCoordinatorConcurrencyTest.scala | 6 +- .../coordinator/group/GroupCoordinatorTest.scala | 173 +++++++++++++++++++-- .../coordinator/group/GroupMetadataTest.scala | 95 ++++++++++- tests/kafkatest/tests/client/consumer_test.py | 6 + 8 files changed, 349 insertions(+), 90 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index e601d34..a9ac67f 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -58,7 +58,7 @@ class GroupCoordinator(val brokerId: Int, import GroupCoordinator._ type JoinCallback = JoinGroupResult => Unit - type SyncCallback = (Array[Byte], Errors) => Unit + type SyncCallback = SyncGroupResult => Unit this.logIdent = "[GroupCoordinator " + brokerId + "]: " @@ -179,37 +179,38 @@ class GroupCoordinator(val brokerId: Int, if (group.hasStaticMember(groupInstanceId)) { val oldMemberId = group.getStaticMemberId(groupInstanceId) + info(s"Static member $groupInstanceId with unknown member id rejoins, assigning new member id $newMemberId, while " + + s"old member $oldMemberId will be removed.") + + val currentLeader = group.leaderOrNull + val member = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId) + // Heartbeat of old member id will expire without effect since the group no longer contains that member id. + // New heartbeat shall be scheduled with new member id. + completeAndScheduleNextHeartbeatExpiration(group, member) + + val knownStaticMember = group.get(newMemberId) + group.updateMember(knownStaticMember, protocols, responseCallback) group.currentState match { - case Stable => - info(s"Static member $groupInstanceId with unknown member id rejoins group ${group.groupId} " + - s"in ${group.currentState} state. Assigning new member id $newMemberId, while old member $oldMemberId " + - "will be removed. No rebalance will be triggered.") - - val oldMember = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId) - - // Heartbeat of old member id will expire without affection since the group no longer contains that member id. - // New heartbeat shall be scheduled with new member id. - completeAndScheduleNextHeartbeatExpiration(group, oldMember) - - responseCallback(JoinGroupResult( - members = if (group.isLeader(newMemberId)) { - group.currentMemberMetadata - } else { - List.empty - }, + case Stable | CompletingRebalance => + info(s"Static member joins during ${group.currentState} stage will not trigger rebalance.") + group.maybeInvokeJoinCallback(member, JoinGroupResult( + members = List.empty, memberId = newMemberId, generationId = group.generationId, subProtocol = group.protocolOrNull, - leaderId = group.leaderOrNull, + // We want to avoid current leader performing trivial assignment while the group + // is in stable/awaiting sync stage, because the new assignment in leader's next sync call + // won't be broadcast by a stable/awaiting sync group. This could be guaranteed by + // always returning the old leader id so that the current leader won't assume itself + // as a leader based on the returned message, since the new member.id won't match + // returned leader id, therefore no assignment will be performed. + leaderId = currentLeader, error = Errors.NONE)) - - case _ => - info(s"Static member $groupInstanceId with unkonwn member id rejoins group ${group.groupId} " + - s"in ${group.currentState} state. Update its membership with the pre-registered old member id $oldMemberId.") - - val knownStaticMember = group.get(oldMemberId) - updateMemberAndRebalance(group, knownStaticMember, protocols, responseCallback) + case Empty | Dead => + throw new IllegalStateException(s"Group ${group.groupId} was not supposed to be " + + s"in the state ${group.currentState} when the unknown static member $groupInstanceId rejoins.") + case PreparingRebalance => } } else if (requireKnownMemberId) { // If member id required (dynamic membership), register the member in the pending member list @@ -224,7 +225,6 @@ class GroupCoordinator(val brokerId: Int, s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.") addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId, clientId, clientHost, protocolType, protocols, group, responseCallback) - } } } @@ -338,13 +338,13 @@ class GroupCoordinator(val brokerId: Int, // group will need to start over at JoinGroup. By returning rebalance in progress, the consumer // will attempt to rejoin without needing to rediscover the coordinator. Note that we cannot // return COORDINATOR_LOAD_IN_PROGRESS since older clients do not expect the error. - responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS) + responseCallback(SyncGroupResult(Array.empty, Errors.REBALANCE_IN_PROGRESS)) - case Some(error) => responseCallback(Array.empty, error) + case Some(error) => responseCallback(SyncGroupResult(Array.empty, error)) case None => groupManager.getGroup(groupId) match { - case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID) + case None => responseCallback(SyncGroupResult(Array.empty, Errors.UNKNOWN_MEMBER_ID)) case Some(group) => doSyncGroup(group, generation, memberId, groupInstanceId, groupAssignment, responseCallback) } } @@ -362,20 +362,20 @@ class GroupCoordinator(val brokerId: Int, // from the coordinator metadata; this is likely that the group has migrated to some other // coordinator OR the group is in a transient unstable phase. Let the member retry // finding the correct coordinator and rejoin. - responseCallback(Array.empty, Errors.COORDINATOR_NOT_AVAILABLE) + responseCallback(SyncGroupResult(Array.empty, Errors.COORDINATOR_NOT_AVAILABLE)) } else if (group.isStaticMemberFenced(memberId, groupInstanceId)) { - responseCallback(Array.empty, Errors.FENCED_INSTANCE_ID) + responseCallback(SyncGroupResult(Array.empty, Errors.FENCED_INSTANCE_ID)) } else if (!group.has(memberId)) { - responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID) + responseCallback(SyncGroupResult(Array.empty, Errors.UNKNOWN_MEMBER_ID)) } else if (generationId != group.generationId) { - responseCallback(Array.empty, Errors.ILLEGAL_GENERATION) + responseCallback(SyncGroupResult(Array.empty, Errors.ILLEGAL_GENERATION)) } else { group.currentState match { case Empty => - responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID) + responseCallback(SyncGroupResult(Array.empty, Errors.UNKNOWN_MEMBER_ID)) case PreparingRebalance => - responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS) + responseCallback(SyncGroupResult(Array.empty, Errors.REBALANCE_IN_PROGRESS)) case CompletingRebalance => group.get(memberId).awaitingSyncCallback = responseCallback @@ -409,7 +409,7 @@ class GroupCoordinator(val brokerId: Int, case Stable => // if the group is stable, we just return the current assignment val memberMetadata = group.get(memberId) - responseCallback(memberMetadata.assignment, Errors.NONE) + responseCallback(SyncGroupResult(memberMetadata.assignment, Errors.NONE)) completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId)) case Dead => @@ -476,7 +476,7 @@ class GroupCoordinator(val brokerId: Int, case Empty => group.transitionTo(Dead) groupsEligibleForDeletion :+= group - case _ => + case Stable | PreparingRebalance | CompletingRebalance => groupErrors += groupId -> Errors.NON_EMPTY_GROUP } } @@ -734,10 +734,7 @@ class GroupCoordinator(val brokerId: Int, case Stable | CompletingRebalance => for (member <- group.allMemberMetadata) { - if (member.awaitingSyncCallback != null) { - member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR) - member.awaitingSyncCallback = null - } + group.maybeInvokeSyncCallback(member, SyncGroupResult(Array.empty, Errors.NOT_COORDINATOR)) heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId)) } } @@ -772,16 +769,13 @@ class GroupCoordinator(val brokerId: Int, private def resetAndPropagateAssignmentError(group: GroupMetadata, error: Errors) { assert(group.is(CompletingRebalance)) - group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte]) + group.allMemberMetadata.foreach(_.assignment = Array.empty) propagateAssignment(group, error) } private def propagateAssignment(group: GroupMetadata, error: Errors) { for (member <- group.allMemberMetadata) { - if (member.awaitingSyncCallback != null) { - member.awaitingSyncCallback(member.assignment, error) - member.awaitingSyncCallback = null - + if (group.maybeInvokeSyncCallback(member, SyncGroupResult(member.assignment, error))) { // reset the session timeout for members after propagating the member's assignment. // This is because if any member's session expired while we were still awaiting either // the leader sync group or the storage callback, its expiration will be ignored and no @@ -791,16 +785,6 @@ class GroupCoordinator(val brokerId: Int, } } - private def joinError(memberId: String, error: Errors): JoinGroupResult = { - JoinGroupResult( - members = List.empty, - memberId = memberId, - generationId = GroupCoordinator.NoGeneration, - subProtocol = GroupCoordinator.NoProtocol, - leaderId = GroupCoordinator.NoLeader, - error = error) - } - /** * Complete existing DelayedHeartbeats for the given member and schedule the next one */ @@ -1110,6 +1094,15 @@ object GroupCoordinator { new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time) } + def joinError(memberId: String, error: Errors): JoinGroupResult = { + JoinGroupResult( + members = List.empty, + memberId = memberId, + generationId = GroupCoordinator.NoGeneration, + subProtocol = GroupCoordinator.NoProtocol, + leaderId = GroupCoordinator.NoLeader, + error = error) + } } case class GroupConfig(groupMinSessionTimeoutMs: Int, @@ -1123,3 +1116,6 @@ case class JoinGroupResult(members: List[JoinGroupResponseMember], subProtocol: String, leaderId: String, error: Errors) + +case class SyncGroupResult(memberAssignment: Array[Byte], + error: Errors) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 21aae42..58a68a2 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -23,6 +23,7 @@ import kafka.common.OffsetAndMetadata import kafka.utils.{CoreUtils, Logging, nonthreadsafe} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.utils.Time import scala.collection.{Seq, immutable, mutable} @@ -162,7 +163,7 @@ case class GroupSummary(state: String, * being materialized. */ case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offsetAndMetadata: OffsetAndMetadata) { - def olderThan(that: CommitRecordMetadataAndOffset) : Boolean = appendedBatchOffset.get < that.appendedBatchOffset.get + def olderThan(that: CommitRecordMetadataAndOffset): Boolean = appendedBatchOffset.get < that.appendedBatchOffset.get } /** @@ -290,6 +291,19 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState val oldMember = members.remove(oldMemberId) .getOrElse(throw new IllegalArgumentException(s"Cannot replace non-existing member id $oldMemberId")) + // Fence potential duplicate member immediately if someone awaits join/sync callback. + maybeInvokeJoinCallback(oldMember, JoinGroupResult( + members = List.empty, + memberId = oldMemberId, + generationId = GroupCoordinator.NoGeneration, + subProtocol = GroupCoordinator.NoProtocol, + leaderId = GroupCoordinator.NoLeader, + error = Errors.FENCED_INSTANCE_ID)) + + maybeInvokeSyncCallback(oldMember, SyncGroupResult( + Array.empty, Errors.FENCED_INSTANCE_ID + )) + oldMember.memberId = newMemberId members.put(newMemberId, oldMember) @@ -425,7 +439,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState } def maybeInvokeJoinCallback(member: MemberMetadata, - joinGroupResult: JoinGroupResult) : Unit = { + joinGroupResult: JoinGroupResult): Unit = { if (member.isAwaitingJoin) { member.awaitingJoinCallback(joinGroupResult) member.awaitingJoinCallback = null @@ -433,6 +447,20 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState } } + /** + * @return true if a sync callback actually performs. + */ + def maybeInvokeSyncCallback(member: MemberMetadata, + syncGroupResult: SyncGroupResult): Boolean = { + if (member.isAwaitingSync) { + member.awaitingSyncCallback(syncGroupResult) + member.awaitingSyncCallback = null + true + } else { + false + } + } + def initNextGeneration() = { if (members.nonEmpty) { generationId += 1 @@ -600,7 +628,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState }.toMap } - def removeExpiredOffsets(currentTimestamp: Long, offsetRetentionMs: Long) : Map[TopicPartition, OffsetAndMetadata] = { + def removeExpiredOffsets(currentTimestamp: Long, offsetRetentionMs: Long): Map[TopicPartition, OffsetAndMetadata] = { def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long): Map[TopicPartition, OffsetAndMetadata] = { offsets.filter { diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala index a090d97..83ff709 100644 --- a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala @@ -66,13 +66,14 @@ private[group] class MemberMetadata(var memberId: String, var assignment: Array[Byte] = Array.empty[Byte] var awaitingJoinCallback: JoinGroupResult => Unit = null - var awaitingSyncCallback: (Array[Byte], Errors) => Unit = null + var awaitingSyncCallback: SyncGroupResult => Unit = null var latestHeartbeat: Long = -1 var isLeaving: Boolean = false var isNew: Boolean = false val isStaticMember: Boolean = groupInstanceId.isDefined def isAwaitingJoin = awaitingJoinCallback != null + def isAwaitingSync = awaitingSyncCallback != null /** * Get metadata corresponding to the provided protocol. diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0a1f251..fa45cf5 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -31,7 +31,7 @@ import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0, KAFKA_2_3_IV0} import kafka.cluster.Partition import kafka.common.OffsetAndMetadata import kafka.controller.KafkaController -import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult} +import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult, SyncGroupResult} import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} import kafka.message.ZStdCompressionCodec import kafka.network.RequestChannel @@ -1408,12 +1408,12 @@ class KafkaApis(val requestChannel: RequestChannel, def handleSyncGroupRequest(request: RequestChannel.Request) { val syncGroupRequest = request.body[SyncGroupRequest] - def sendResponseCallback(memberState: Array[Byte], error: Errors) { + def sendResponseCallback(syncGroupResult: SyncGroupResult) { sendResponseMaybeThrottle(request, requestThrottleMs => new SyncGroupResponse( new SyncGroupResponseData() - .setErrorCode(error.code) - .setAssignment(memberState) + .setErrorCode(syncGroupResult.error.code) + .setAssignment(syncGroupResult.memberAssignment) .setThrottleTimeMs(requestThrottleMs) )) } @@ -1422,9 +1422,9 @@ class KafkaApis(val requestChannel: RequestChannel, // Only enable static membership when IBP >= 2.3, because it is not safe for the broker to use the static member logic // until we are sure that all brokers support it. If static group being loaded by an older coordinator, it will discard // the group.instance.id field, so static members could accidentally become "dynamic", which leads to wrong states. - sendResponseCallback(Array[Byte](), Errors.UNSUPPORTED_VERSION) + sendResponseCallback(SyncGroupResult(Array[Byte](), Errors.UNSUPPORTED_VERSION)) } else if (!authorize(request.session, Read, Resource(Group, syncGroupRequest.data.groupId, LITERAL))) { - sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED) + sendResponseCallback(SyncGroupResult(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED)) } else { val assignmentMap = immutable.Map.newBuilder[String, Array[Byte]] syncGroupRequest.data.assignments.asScala.foreach { assignment => diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala index 3da5a0c..1cee665 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala @@ -173,8 +173,8 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest class SyncGroupOperation extends GroupOperation[SyncGroupCallbackParams, SyncGroupCallback] { override def responseCallback(responsePromise: Promise[SyncGroupCallbackParams]): SyncGroupCallback = { - val callback: SyncGroupCallback = (assignment, error) => - responsePromise.success((assignment, error)) + val callback: SyncGroupCallback = syncGroupResult => + responsePromise.success(syncGroupResult.memberAssignment, syncGroupResult.error) callback } override def runWithCallback(member: GroupMember, responseCallback: SyncGroupCallback): Unit = { @@ -280,7 +280,7 @@ object GroupCoordinatorConcurrencyTest { type JoinGroupCallback = JoinGroupResult => Unit type SyncGroupCallbackParams = (Array[Byte], Errors) - type SyncGroupCallback = (Array[Byte], Errors) => Unit + type SyncGroupCallback = SyncGroupResult => Unit type HeartbeatCallbackParams = Errors type HeartbeatCallback = Errors => Unit type CommitOffsetCallbackParams = Map[TopicPartition, Errors] diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index cd1ebc5..c12e19d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -47,7 +47,7 @@ import scala.concurrent.{Await, Future, Promise, TimeoutException} class GroupCoordinatorTest { type JoinGroupCallback = JoinGroupResult => Unit type SyncGroupCallbackParams = (Array[Byte], Errors) - type SyncGroupCallback = (Array[Byte], Errors) => Unit + type SyncGroupCallback = SyncGroupResult => Unit type HeartbeatCallbackParams = Errors type HeartbeatCallback = Errors => Unit type CommitOffsetCallbackParams = Map[TopicPartition, Errors] @@ -59,7 +59,7 @@ class GroupCoordinatorTest { val ClientHost = "localhost" val GroupMinSessionTimeout = 10 val GroupMaxSessionTimeout = 10 * 60 * 1000 - val GroupMaxSize = 3 + val GroupMaxSize = 4 val DefaultRebalanceTimeout = 500 val DefaultSessionTimeout = 500 val GroupInitialRebalanceDelay = 50 @@ -146,7 +146,7 @@ class GroupCoordinatorTest { // SyncGroup var syncGroupResponse: Option[Errors] = None groupCoordinator.handleSyncGroup(otherGroupId, 1, memberId, None, Map.empty[String, Array[Byte]], - (_, error)=> syncGroupResponse = Some(error)) + syncGroupResult => syncGroupResponse = Some(syncGroupResult.error)) assertEquals(Some(Errors.REBALANCE_IN_PROGRESS), syncGroupResponse) // OffsetCommit @@ -439,6 +439,151 @@ class GroupCoordinatorTest { } @Test + def staticMemberFenceDuplicateRejoinedFollower() { + val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) + + EasyMock.reset(replicaManager) + // A third member joins will trigger rebalance. + sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + timer.advanceClock(1) + assertTrue(getGroup(groupId).is(PreparingRebalance)) + + EasyMock.reset(replicaManager) + timer.advanceClock(1) + // Old follower rejoins group will be matching current member.id. + val oldFollowerJoinGroupFuture = + sendJoinGroup(groupId, rebalanceResult.followerId, protocolType, protocols, groupInstanceId = followerInstanceId) + + EasyMock.reset(replicaManager) + timer.advanceClock(1) + // Duplicate follower joins group with unknown member id will trigger member.id replacement. + val duplicateFollowerJoinFuture = + sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, groupInstanceId = followerInstanceId) + + timer.advanceClock(1) + // Old member shall be fenced immediately upon duplicate follower joins. + val oldFollowerJoinGroupResult = Await.result(oldFollowerJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) + checkJoinGroupResult(oldFollowerJoinGroupResult, + Errors.FENCED_INSTANCE_ID, + -1, + Set.empty, + groupId, + PreparingRebalance) + verifyDelayedTaskNotCompleted(duplicateFollowerJoinFuture) + } + + @Test + def staticMemberFenceDuplicateSyncingFollowerAfterMemberIdChanged() { + val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) + + EasyMock.reset(replicaManager) + // Known leader rejoins will trigger rebalance. + val leaderJoinGroupFuture = + sendJoinGroup(groupId, rebalanceResult.leaderId, protocolType, protocols, groupInstanceId = leaderInstanceId) + timer.advanceClock(1) + assertTrue(getGroup(groupId).is(PreparingRebalance)) + + EasyMock.reset(replicaManager) + timer.advanceClock(1) + // Old follower rejoins group will match current member.id. + val oldFollowerJoinGroupFuture = + sendJoinGroup(groupId, rebalanceResult.followerId, protocolType, protocols, groupInstanceId = followerInstanceId) + + timer.advanceClock(1) + val leaderJoinGroupResult = Await.result(leaderJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) + checkJoinGroupResult(leaderJoinGroupResult, + Errors.NONE, + rebalanceResult.generation + 1, + Set(leaderInstanceId, followerInstanceId), + groupId, + CompletingRebalance) + assertEquals(leaderJoinGroupResult.leaderId, leaderJoinGroupResult.memberId) + assertEquals(rebalanceResult.leaderId, leaderJoinGroupResult.leaderId) + + // Old member shall be getting a successful join group response. + val oldFollowerJoinGroupResult = Await.result(oldFollowerJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) + checkJoinGroupResult(oldFollowerJoinGroupResult, + Errors.NONE, + rebalanceResult.generation + 1, + Set.empty, + groupId, + CompletingRebalance, + expectedLeaderId = leaderJoinGroupResult.memberId) + + EasyMock.reset(replicaManager) + val oldFollowerSyncGroupFuture = sendSyncGroupFollower(groupId, oldFollowerJoinGroupResult.generationId, + oldFollowerJoinGroupResult.memberId, followerInstanceId) + + // Duplicate follower joins group with unknown member id will trigger member.id replacement. + EasyMock.reset(replicaManager) + val duplicateFollowerJoinFuture = + sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, groupInstanceId = followerInstanceId) + timer.advanceClock(1) + + // Old follower sync callback will return fenced exception while broker replaces the member identity. + val oldFollowerSyncGroupResult = Await.result(oldFollowerSyncGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) + assertEquals(oldFollowerSyncGroupResult._2, Errors.FENCED_INSTANCE_ID) + + // Duplicate follower will get the same response as old follower. + val duplicateFollowerJoinGroupResult = Await.result(duplicateFollowerJoinFuture, Duration(1, TimeUnit.MILLISECONDS)) + checkJoinGroupResult(duplicateFollowerJoinGroupResult, + Errors.NONE, + rebalanceResult.generation + 1, + Set.empty, + groupId, + CompletingRebalance, + expectedLeaderId = leaderJoinGroupResult.memberId) + } + + @Test + def staticMemberFenceDuplicateRejoiningFollowerAfterMemberIdChanged() { + val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) + + EasyMock.reset(replicaManager) + // Known leader rejoins will trigger rebalance. + val leaderJoinGroupFuture = + sendJoinGroup(groupId, rebalanceResult.leaderId, protocolType, protocols, groupInstanceId = leaderInstanceId) + timer.advanceClock(1) + assertTrue(getGroup(groupId).is(PreparingRebalance)) + + EasyMock.reset(replicaManager) + // Duplicate follower joins group will trigger member.id replacement. + val duplicateFollowerJoinGroupFuture = + sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, groupInstanceId = followerInstanceId) + + EasyMock.reset(replicaManager) + timer.advanceClock(1) + // Old follower rejoins group will fail because member.id already updated. + val oldFollowerJoinGroupFuture = + sendJoinGroup(groupId, rebalanceResult.followerId, protocolType, protocols, groupInstanceId = followerInstanceId) + + val leaderRejoinGroupResult = Await.result(leaderJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) + checkJoinGroupResult(leaderRejoinGroupResult, + Errors.NONE, + rebalanceResult.generation + 1, + Set(leaderInstanceId, followerInstanceId), + groupId, + CompletingRebalance) + + val duplicateFollowerJoinGroupResult = Await.result(duplicateFollowerJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) + checkJoinGroupResult(duplicateFollowerJoinGroupResult, + Errors.NONE, + rebalanceResult.generation + 1, + Set.empty, + groupId, + CompletingRebalance) + assertNotEquals(rebalanceResult.followerId, duplicateFollowerJoinGroupResult.memberId) + + val oldFollowerJoinGroupResult = Await.result(oldFollowerJoinGroupFuture, Duration(1, TimeUnit.MILLISECONDS)) + checkJoinGroupResult(oldFollowerJoinGroupResult, + Errors.FENCED_INSTANCE_ID, + -1, + Set.empty, + groupId, + CompletingRebalance) + } + + @Test def staticMemberRejoinWithKnownMemberId() { var joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, groupInstanceId, protocolType, protocols) assertEquals(Errors.NONE, joinGroupResult.error) @@ -464,31 +609,31 @@ class GroupCoordinatorTest { def staticMemberRejoinWithLeaderIdAndUnknownMemberId() { val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, followerInstanceId) - // A static leader rejoin with unknown id will not trigger rebalance. + // A static leader rejoin with unknown id will not trigger rebalance, and no assignment will be returned. EasyMock.reset(replicaManager) val joinGroupResult = staticJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1) checkJoinGroupResult(joinGroupResult, Errors.NONE, rebalanceResult.generation, // The group should be at the same generation - Set(leaderInstanceId, followerInstanceId), + Set.empty, groupId, - Stable) + Stable, + rebalanceResult.leaderId) EasyMock.reset(replicaManager) val oldLeaderJoinGroupResult = staticJoinGroup(groupId, rebalanceResult.leaderId, leaderInstanceId, protocolType, protocolSuperset, clockAdvance = 1) assertEquals(Errors.FENCED_INSTANCE_ID, oldLeaderJoinGroupResult.error) EasyMock.reset(replicaManager) - assertNotEquals(rebalanceResult.leaderId, joinGroupResult.leaderId) // Old leader will get fenced. val oldLeaderSyncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, rebalanceResult.leaderId, Map.empty, leaderInstanceId) assertEquals(Errors.FENCED_INSTANCE_ID, oldLeaderSyncGroupResult._2) + // Calling sync on old leader.id will fail because that leader.id is no longer valid and replaced. EasyMock.reset(replicaManager) val newLeaderSyncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, joinGroupResult.leaderId, Map.empty) - assertEquals(Errors.NONE, newLeaderSyncGroupResult._2) - assertEquals(rebalanceResult.leaderAssignment, newLeaderSyncGroupResult._1) + assertEquals(Errors.UNKNOWN_MEMBER_ID, newLeaderSyncGroupResult._2) } @Test @@ -599,7 +744,7 @@ class GroupCoordinatorTest { val leaderRejoinGroupFuture = sendJoinGroup(groupId, rebalanceResult.leaderId, protocolType, protocolSuperset, leaderInstanceId) // Rebalance complete immediately after follower rejoin. EasyMock.reset(replicaManager) - val followerRejoinWithFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocolSuperset, followerInstanceId) + val followerRejoinWithFuture = sendJoinGroup(groupId, rebalanceResult.followerId, protocolType, protocolSuperset, followerInstanceId) timer.advanceClock(1) @@ -656,6 +801,8 @@ class GroupCoordinatorTest { groupId, Stable) + assertNotEquals(rebalanceResult.followerId, joinGroupResult.memberId) + EasyMock.reset(replicaManager) val syncGroupResult = syncGroupFollower(groupId, rebalanceResult.generation, joinGroupResult.memberId) assertEquals(Errors.NONE, syncGroupResult._2) @@ -1000,7 +1147,7 @@ class GroupCoordinatorTest { expectedGroupState: GroupState, expectedLeaderId: String = JoinGroupRequest.UNKNOWN_MEMBER_ID, expectedMemberId: String = JoinGroupRequest.UNKNOWN_MEMBER_ID) { - assertEquals(Errors.NONE, joinGroupResult.error) + assertEquals(expectedError, joinGroupResult.error) assertEquals(expectedGeneration, joinGroupResult.generationId) assertEquals(expectedGroupInstanceIds.size, joinGroupResult.members.size) val resultedGroupInstanceIds = joinGroupResult.members.map(member => Some(member.groupInstanceId())).toSet @@ -2547,8 +2694,8 @@ class GroupCoordinatorTest { private def setupSyncGroupCallback: (Future[SyncGroupCallbackParams], SyncGroupCallback) = { val responsePromise = Promise[SyncGroupCallbackParams] val responseFuture = responsePromise.future - val responseCallback: SyncGroupCallback = (assignment, error) => - responsePromise.success((assignment, error)) + val responseCallback: SyncGroupCallback = syncGroupResult => + responsePromise.success(syncGroupResult.memberAssignment, syncGroupResult.error) (responseFuture, responseCallback) } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala index a3a9008..177bef7 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala @@ -19,6 +19,7 @@ package kafka.coordinator.group import kafka.common.OffsetAndMetadata import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.utils.Time import org.junit.Assert._ import org.junit.{Before, Test} @@ -30,16 +31,20 @@ class GroupMetadataTest { private val protocolType = "consumer" private val groupId = "groupId" private val groupInstanceId = Some("groupInstanceId") + private val memberId = "memberId" private val clientId = "clientId" private val clientHost = "clientHost" private val rebalanceTimeoutMs = 60000 private val sessionTimeoutMs = 10000 private var group: GroupMetadata = null + private var member: MemberMetadata = null @Before def setUp() { group = new GroupMetadata("groupId", Empty, Time.SYSTEM) + member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, + protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) } @Test @@ -240,10 +245,6 @@ class GroupMetadataTest { // by default, the group supports everything assertTrue(group.supportsProtocols(protocolType, Set("roundrobin", "range"))) - val memberId = "memberId" - val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, - sessionTimeoutMs, protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) - group.add(member) group.transitionTo(PreparingRebalance) assertTrue(group.supportsProtocols(protocolType, Set("roundrobin", "foo"))) @@ -263,9 +264,7 @@ class GroupMetadataTest { @Test def testInitNextGeneration() { - val memberId = "memberId" - val member = new MemberMetadata(memberId, groupId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, - protocolType, List(("roundrobin", Array.empty[Byte]))) + member.supportedProtocols = List(("roundrobin", Array.empty[Byte])) group.transitionTo(PreparingRebalance) group.add(member, _ => ()) @@ -465,6 +464,88 @@ class GroupMetadataTest { assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId)) } + @Test(expected = classOf[IllegalArgumentException]) + def testReplaceGroupInstanceWithEmptyGroupInstanceId(): Unit = { + group.add(member) + group.addStaticMember(groupInstanceId, memberId) + assertTrue(group.isLeader(memberId)) + assertEquals(memberId, group.getStaticMemberId(groupInstanceId)) + + val newMemberId = "newMemberId" + group.replaceGroupInstance(memberId, newMemberId, Option.empty) + } + + @Test(expected = classOf[IllegalArgumentException]) + def testReplaceGroupInstanceWithNonExistingMember(): Unit = { + val newMemberId = "newMemberId" + group.replaceGroupInstance(memberId, newMemberId, groupInstanceId) + } + + @Test + def testReplaceGroupInstance(): Unit = { + var joinAwaitingMemberFenced = false + group.add(member, joinGroupResult => { + joinAwaitingMemberFenced = joinGroupResult.error == Errors.FENCED_INSTANCE_ID + }) + var syncAwaitingMemberFenced = false + member.awaitingSyncCallback = syncGroupResult => { + syncAwaitingMemberFenced = syncGroupResult.error == Errors.FENCED_INSTANCE_ID + } + group.addStaticMember(groupInstanceId, memberId) + assertTrue(group.isLeader(memberId)) + assertEquals(memberId, group.getStaticMemberId(groupInstanceId)) + + val newMemberId = "newMemberId" + group.replaceGroupInstance(memberId, newMemberId, groupInstanceId) + assertTrue(group.isLeader(newMemberId)) + assertEquals(newMemberId, group.getStaticMemberId(groupInstanceId)) + assertTrue(joinAwaitingMemberFenced) + assertTrue(syncAwaitingMemberFenced) + assertFalse(member.isAwaitingJoin) + assertFalse(member.isAwaitingSync) + } + + @Test + def testInvokeJoinCallback(): Unit = { + var invoked = false + group.add(member, _ => { + invoked = true + }) + + assertTrue(group.hasAllMembersJoined) + group.maybeInvokeJoinCallback(member, GroupCoordinator.joinError(member.memberId, Errors.NONE)) + assertTrue(invoked) + assertFalse(member.isAwaitingJoin) + } + + @Test + def testNotInvokeJoinCallback(): Unit = { + group.add(member) + + assertFalse(member.isAwaitingJoin) + group.maybeInvokeJoinCallback(member, GroupCoordinator.joinError(member.memberId, Errors.NONE)) + assertFalse(member.isAwaitingJoin) + } + + @Test + def testInvokeSyncCallback(): Unit = { + group.add(member) + member.awaitingSyncCallback = _ => {} + + val invoked = group.maybeInvokeSyncCallback(member, SyncGroupResult(Array.empty, Errors.NONE)) + assertTrue(invoked) + assertFalse(member.isAwaitingSync) + } + + @Test + def testNotInvokeSyncCallback(): Unit = { + group.add(member) + + val invoked = group.maybeInvokeSyncCallback(member, SyncGroupResult(Array.empty, Errors.NONE)) + assertFalse(invoked) + assertFalse(member.isAwaitingSync) + } + private def assertState(group: GroupMetadata, targetState: GroupState) { val states: Set[GroupState] = Set(Stable, PreparingRebalance, CompletingRebalance, Dead) val otherStates = states - targetState diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py index be15e6a..131123f 100644 --- a/tests/kafkatest/tests/client/consumer_test.py +++ b/tests/kafkatest/tests/client/consumer_test.py @@ -265,6 +265,12 @@ class OffsetValidationTest(VerifiableConsumerTest): "normal consumer group and %d from conflict consumer group" % \ (len(consumer.nodes), len(consumer.joined_nodes()), len(conflict_consumer.joined_nodes())) ) + wait_until(lambda: len(consumer.dead_nodes()) + len(conflict_consumer.dead_nodes()) == len(conflict_consumer.nodes), + timeout_sec=self.session_timeout_sec, + err_msg="Timed out waiting for fenced consumers to die, expected total %d dead, but only see %d dead in" + "normal consumer group and %d dead in conflict consumer group" % \ + (len(conflict_consumer.nodes), len(consumer.dead_nodes()), len(conflict_consumer.dead_nodes())) + ) @cluster(num_nodes=7) @matrix(clean_shutdown=[True], enable_autocommit=[True, False])