hachikuji commented on a change in pull request #9441: URL: https://github.com/apache/kafka/pull/9441#discussion_r618025026
########## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ########## @@ -905,19 +908,32 @@ class GroupCoordinator(val brokerId: Int, * * @param offsetTopicPartitionId The partition we are now leading */ - def onElection(offsetTopicPartitionId: Int): Unit = { - info(s"Elected as the group coordinator for partition $offsetTopicPartitionId") - groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded) + def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = { + val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId)) + if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) { + info(s"Elected as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch") + groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded) + epochForPartitionId.put(offsetTopicPartitionId, coordinatorEpoch) + } else { + warn(s"Ignored election as group coordinator for partition $offsetTopicPartitionId " + + s"in epoch $coordinatorEpoch since current epoch is $currentEpoch") + } } /** * Unload cached state for the given partition and stop handling requests for groups which map to it. * * @param offsetTopicPartitionId The partition we are no longer leading */ - def onResignation(offsetTopicPartitionId: Int): Unit = { - info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId") - groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded) + def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = { + val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId)) + if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) { Review comment: Similarly, we should update `epochForPartitionId` here with a CAS operation. ########## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ########## @@ -905,19 +908,32 @@ class GroupCoordinator(val brokerId: Int, * * @param offsetTopicPartitionId The partition we are now leading */ - def onElection(offsetTopicPartitionId: Int): Unit = { - info(s"Elected as the group coordinator for partition $offsetTopicPartitionId") - groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded) + def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = { + val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId)) + if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) { Review comment: Can we do a CAS update? Otherwise I don't think this is safe. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -279,30 +279,33 @@ class KafkaApis(val requestChannel: RequestChannel, new StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code))) } else { val partitionStates = stopReplicaRequest.partitionStates().asScala - val (result, error) = replicaManager.stopReplicas( - request.context.correlationId, - stopReplicaRequest.controllerId, - stopReplicaRequest.controllerEpoch, - stopReplicaRequest.brokerEpoch, - partitionStates) - // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we - // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas. - result.forKeyValue { (topicPartition, error) => - if (error == Errors.NONE) { - if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME - && partitionStates(topicPartition).deletePartition) { - groupCoordinator.onResignation(topicPartition.partition) - } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME - && partitionStates(topicPartition).deletePartition) { + def onStopReplicas(error: Errors, partitions: Map[TopicPartition, Errors]): Unit = { + // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we + // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas. + partitions.forKeyValue { (topicPartition, partitionError) => + if (partitionError == Errors.NONE) { val partitionState = partitionStates(topicPartition) val leaderEpoch = if (partitionState.leaderEpoch >= 0) - Some(partitionState.leaderEpoch) + Some(partitionState.leaderEpoch) else None - txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch) + if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME + && partitionState.deletePartition) { + groupCoordinator.onResignation(topicPartition.partition, leaderEpoch) + } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME + && partitionState.deletePartition) { + txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch) + } } } } + val (result, error) = replicaManager.stopReplicas( + request.context.correlationId, + stopReplicaRequest.controllerId, + stopReplicaRequest.controllerEpoch, + stopReplicaRequest.brokerEpoch, + partitionStates, + onStopReplicas) Review comment: It's not clear to me why we moved this in `ReplicaManager`. Is there some reason we need the `replicaStateChangeLock` lock? ########## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ########## @@ -2891,17 +2898,29 @@ class KafkaApisTest { EasyMock.eq(controllerId), EasyMock.eq(controllerEpoch), EasyMock.eq(brokerEpochInRequest), - EasyMock.eq(stopReplicaRequest.partitionStates().asScala) - )).andStubReturn( - (mutable.Map( + EasyMock.eq(stopReplicaRequest.partitionStates().asScala), + EasyMock.anyObject() + )).andStubAnswer {() => + val result = (mutable.Map( fooPartition -> Errors.NONE ), Errors.NONE) - ) +//<<<<<<< HEAD Review comment: Can you fix this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org