chia7712 commented on code in PR #19535:
URL: https://github.com/apache/kafka/pull/19535#discussion_r2061429213


##########
core/src/main/scala/kafka/server/AbstractFetcherManager.scala:
##########
@@ -43,7 +43,7 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
   metricsGroup.newGauge("MaxLag", () => {
     // current max lag across all fetchers/topics/partitions
     fetcherThreadMap.values.foldLeft(0L) { (curMaxLagAll, fetcherThread) =>
-      val maxLagThread = 
fetcherThread.fetcherLagStats.stats.values.foldLeft(0L)((curMaxLagThread, 
lagMetrics) =>
+      val maxLagThread = 
fetcherThread.fetcherLagStats.stats.values.asScala.foldLeft(0L)((curMaxLagThread,
 lagMetrics) =>

Review Comment:
   ```scala
   val maxLagThread = 
fetcherThread.fetcherLagStats.stats.values.stream().mapToLong(v => 
v.lag).max().orElse(0L)
   ```



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -983,12 +983,12 @@ class Partition(val topicPartition: TopicPartition,
   ): Unit = {
     if (isLeader) {
       val followers = replicas.filter(_ != localBrokerId)
-      val removedReplicas = 
remoteReplicasMap.keys.filterNot(followers.contains(_))
+      val removedReplicas = 
remoteReplicasMap.keySet.asScala.filterNot(followers.contains(_))
 
       // Due to code paths accessing remoteReplicasMap without a lock,
       // first add the new replicas and then remove the old ones.
-      followers.foreach(id => remoteReplicasMap.getAndMaybePut(id, new 
Replica(id, topicPartition, metadataCache)))
-      remoteReplicasMap.removeAll(removedReplicas)
+      followers.foreach(id => remoteReplicasMap.computeIfAbsent(id, k => new 
Replica(id, topicPartition, metadataCache)))
+      remoteReplicasMap.keySet.removeAll(removedReplicas.asJavaCollection)

Review Comment:
   ```java
   remoteReplicasMap.keySet.removeIf(replica => !followers.contains(replica))
   ```



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2512,7 +2510,7 @@ class ReplicaManager(val config: KafkaConfig,
     trace("Evaluating ISR list of partitions to see which replicas can be 
removed from the ISR")
 
     // Shrink ISRs for non offline partitions
-    allPartitions.keys.foreach { topicPartition =>
+    allPartitions.keys.asScala.foreach { topicPartition =>

Review Comment:
   ```scala
       allPartitions.forEach { (topicPartition, _) =>
         onlinePartition(topicPartition).foreach(_.maybeShrinkIsr())
       }
   ```



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -916,7 +915,7 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
   }
 
   def unregister(): Unit = {
-    stats.keys.toBuffer.foreach { key: TopicPartition =>
+    stats.keys.asScala.toBuffer.foreach { key: TopicPartition =>

Review Comment:
   ```scala
   stats.forEach((key, _) => unregister(key))
   ```



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -983,12 +983,12 @@ class Partition(val topicPartition: TopicPartition,
   ): Unit = {
     if (isLeader) {
       val followers = replicas.filter(_ != localBrokerId)
-      val removedReplicas = 
remoteReplicasMap.keys.filterNot(followers.contains(_))
+      val removedReplicas = 
remoteReplicasMap.keySet.asScala.filterNot(followers.contains(_))
 
       // Due to code paths accessing remoteReplicasMap without a lock,
       // first add the new replicas and then remove the old ones.
-      followers.foreach(id => remoteReplicasMap.getAndMaybePut(id, new 
Replica(id, topicPartition, metadataCache)))
-      remoteReplicasMap.removeAll(removedReplicas)
+      followers.foreach(id => remoteReplicasMap.computeIfAbsent(id, k => new 
Replica(id, topicPartition, metadataCache)))

Review Comment:
   k -> `_`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to