hachikuji commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r620695517



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -905,19 +908,35 @@ class GroupCoordinator(val brokerId: Int,
    *
    * @param offsetTopicPartitionId The partition we are now leading
    */
-  def onElection(offsetTopicPartitionId: Int): Unit = {
-    info(s"Elected as the group coordinator for partition 
$offsetTopicPartitionId")
-    groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, 
onGroupLoaded)
+  def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+    epochForPartitionId.compute(offsetTopicPartitionId, (_, epoch) => {
+      val currentEpoch = Option(epoch)
+      if (currentEpoch.forall(currentEpoch => coordinatorEpoch > 
currentEpoch)) {

Review comment:
       One final thing I was considering is whether we should push this check 
into `GroupMetadataManager.loadGroupsAndOffsets`. That would give us some 
protection against any assumptions about ordering in `KafkaScheduler`.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -905,19 +908,32 @@ class GroupCoordinator(val brokerId: Int,
    *
    * @param offsetTopicPartitionId The partition we are now leading
    */
-  def onElection(offsetTopicPartitionId: Int): Unit = {
-    info(s"Elected as the group coordinator for partition 
$offsetTopicPartitionId")
-    groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, 
onGroupLoaded)
+  def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
+    val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId))
+    if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) {
+      info(s"Elected as the group coordinator for partition 
$offsetTopicPartitionId in epoch $coordinatorEpoch")
+      groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, 
onGroupLoaded)
+      epochForPartitionId.put(offsetTopicPartitionId, coordinatorEpoch)
+    } else {
+      warn(s"Ignored election as group coordinator for partition 
$offsetTopicPartitionId " +
+        s"in epoch $coordinatorEpoch since current epoch is $currentEpoch")
+    }
   }
 
   /**
    * Unload cached state for the given partition and stop handling requests 
for groups which map to it.
    *
    * @param offsetTopicPartitionId The partition we are no longer leading
    */
-  def onResignation(offsetTopicPartitionId: Int): Unit = {
-    info(s"Resigned as the group coordinator for partition 
$offsetTopicPartitionId")
-    groupManager.removeGroupsForPartition(offsetTopicPartitionId, 
onGroupUnloaded)
+  def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: 
Option[Int]): Unit = {
+    val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId))
+    if (currentEpoch.forall(currentEpoch => currentEpoch <= 
coordinatorEpoch.getOrElse(Int.MaxValue))) {

Review comment:
       I have probably not been doing a good job of being clear. It is useful 
to bump the epoch whenever we observe a larger value whether it is in 
`onResignation` or `onElection`. This protects us from all potential 
reorderings.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to