chia7712 commented on code in PR #19535: URL: https://github.com/apache/kafka/pull/19535#discussion_r2061429213
########## core/src/main/scala/kafka/server/AbstractFetcherManager.scala: ########## @@ -43,7 +43,7 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri metricsGroup.newGauge("MaxLag", () => { // current max lag across all fetchers/topics/partitions fetcherThreadMap.values.foldLeft(0L) { (curMaxLagAll, fetcherThread) => - val maxLagThread = fetcherThread.fetcherLagStats.stats.values.foldLeft(0L)((curMaxLagThread, lagMetrics) => + val maxLagThread = fetcherThread.fetcherLagStats.stats.values.asScala.foldLeft(0L)((curMaxLagThread, lagMetrics) => Review Comment: ```scala val maxLagThread = fetcherThread.fetcherLagStats.stats.values.stream().mapToLong(v => v.lag).max().orElse(0L) ``` ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -983,12 +983,12 @@ class Partition(val topicPartition: TopicPartition, ): Unit = { if (isLeader) { val followers = replicas.filter(_ != localBrokerId) - val removedReplicas = remoteReplicasMap.keys.filterNot(followers.contains(_)) + val removedReplicas = remoteReplicasMap.keySet.asScala.filterNot(followers.contains(_)) // Due to code paths accessing remoteReplicasMap without a lock, // first add the new replicas and then remove the old ones. - followers.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition, metadataCache))) - remoteReplicasMap.removeAll(removedReplicas) + followers.foreach(id => remoteReplicasMap.computeIfAbsent(id, k => new Replica(id, topicPartition, metadataCache))) + remoteReplicasMap.keySet.removeAll(removedReplicas.asJavaCollection) Review Comment: ```java remoteReplicasMap.keySet.removeIf(replica => !followers.contains(replica)) ``` ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -2512,7 +2510,7 @@ class ReplicaManager(val config: KafkaConfig, trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") // Shrink ISRs for non offline partitions - allPartitions.keys.foreach { topicPartition => + allPartitions.keys.asScala.foreach { topicPartition => Review Comment: ```scala allPartitions.forEach { (topicPartition, _) => onlinePartition(topicPartition).foreach(_.maybeShrinkIsr()) } ``` ########## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ########## @@ -916,7 +915,7 @@ class FetcherLagStats(metricId: ClientIdAndBroker) { } def unregister(): Unit = { - stats.keys.toBuffer.foreach { key: TopicPartition => + stats.keys.asScala.toBuffer.foreach { key: TopicPartition => Review Comment: ```scala stats.forEach((key, _) => unregister(key)) ``` ########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -983,12 +983,12 @@ class Partition(val topicPartition: TopicPartition, ): Unit = { if (isLeader) { val followers = replicas.filter(_ != localBrokerId) - val removedReplicas = remoteReplicasMap.keys.filterNot(followers.contains(_)) + val removedReplicas = remoteReplicasMap.keySet.asScala.filterNot(followers.contains(_)) // Due to code paths accessing remoteReplicasMap without a lock, // first add the new replicas and then remove the old ones. - followers.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition, metadataCache))) - remoteReplicasMap.removeAll(removedReplicas) + followers.foreach(id => remoteReplicasMap.computeIfAbsent(id, k => new Replica(id, topicPartition, metadataCache))) Review Comment: k -> `_` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org