hachikuji commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r624131473



##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -526,34 +529,42 @@ class GroupMetadataManager(brokerId: Int,
   /**
    * Asynchronously read the partition from the offsets topic and populate the 
cache
    */
-  def scheduleLoadGroupAndOffsets(offsetsPartition: Int, onGroupLoaded: 
GroupMetadata => Unit): Unit = {
+  def scheduleLoadGroupAndOffsets(offsetsPartition: Int, coordinatorEpoch: 
Int, onGroupLoaded: GroupMetadata => Unit): Unit = {
     val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
offsetsPartition)
-    if (addLoadingPartition(offsetsPartition)) {
-      info(s"Scheduling loading of offsets and group metadata from 
$topicPartition")
-      val startTimeMs = time.milliseconds()
-      scheduler.schedule(topicPartition.toString, () => 
loadGroupsAndOffsets(topicPartition, onGroupLoaded, startTimeMs))
-    } else {
-      info(s"Already loading offsets and group metadata from $topicPartition")
-    }
+    info(s"Scheduling loading of offsets and group metadata from 
$topicPartition for epoch $coordinatorEpoch")
+    val startTimeMs = time.milliseconds()
+    scheduler.schedule(topicPartition.toString, () => 
loadGroupsAndOffsets(topicPartition, coordinatorEpoch, onGroupLoaded, 
startTimeMs))
   }
 
-  private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, 
onGroupLoaded: GroupMetadata => Unit, startTimeMs: java.lang.Long): Unit = {
-    try {
-      val schedulerTimeMs = time.milliseconds() - startTimeMs
-      debug(s"Started loading offsets and group metadata from $topicPartition")
-      doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
-      val endTimeMs = time.milliseconds()
-      val totalLoadingTimeMs = endTimeMs - startTimeMs
-      partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, false)
-      info(s"Finished loading offsets and group metadata from $topicPartition "
-        + s"in $totalLoadingTimeMs milliseconds, of which $schedulerTimeMs 
milliseconds"
-        + s" was spent in the scheduler.")
-    } catch {
-      case t: Throwable => error(s"Error loading offsets from 
$topicPartition", t)
-    } finally {
-      inLock(partitionLock) {
-        ownedPartitions.add(topicPartition.partition)
-        loadingPartitions.remove(topicPartition.partition)
+  private[group] def loadGroupsAndOffsets(
+    topicPartition: TopicPartition,
+    coordinatorEpoch: Int,
+    onGroupLoaded: GroupMetadata => Unit,
+    startTimeMs: java.lang.Long
+  ): Unit = {
+    if (!maybeUpdateCoordinatorEpoch(topicPartition.partition, 
Some(coordinatorEpoch))) {
+      info(s"Not loading offsets and group metadata for $topicPartition " +
+        s"in epoch $coordinatorEpoch since current epoch is 
${epochForPartitionId.get(topicPartition.partition)}")
+    } else if (!addLoadingPartition(topicPartition.partition)) {

Review comment:
       One minor improvement here is to change `addLoadingPartition` so that it 
checks whether the partition is already contained in `ownedPartitions`. If so, 
we can return false.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to