lbradstreet commented on a change in pull request #8467:
URL: https://github.com/apache/kafka/pull/8467#discussion_r414190233



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -1003,9 +1003,17 @@ class LogManager(logDirs: Seq[File],
   /**
    * Map of log dir to logs by topic and partitions in that dir
    */
-  private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
-    (this.currentLogs.toList ++ this.futureLogs.toList).toMap
-      .groupBy { case (_, log) => log.parentDir }
+  def logsByDir: Map[String, Map[TopicPartition, Log]] = {
+    // This code is called often by checkpoint processes and is written in a 
way that reduces
+    // allocations and CPU with many topic partitions.
+    // When changing this code please measure the changes with 
org.apache.kafka.jmh.server.CheckpointBench
+    val byDir = new mutable.AnyRefMap[String, 
mutable.AnyRefMap[TopicPartition, Log]]()
+    def addToDir(tp: TopicPartition, log: Log): Unit = {
+      byDir.getOrElseUpdate(log.parentDir, new 
mutable.AnyRefMap[TopicPartition, Log]()).put(tp, log)
+    }
+    currentLogs.foreach { case (tp, log) => addToDir(tp, log) }
+    futureLogs.foreach { case (tp, log) => addToDir(tp, log) }

Review comment:
       AFAICT, it easy enough to do something like this:
   ```
   pool.forEach((k,v) => f(k,v))
   ```
   But then we appear to be back to allocating tuples.
   
   It doesn't appear easy to do something like:
   ```
   pool.forEach(kv => f(kv.getKey, kv.getValue))
   ```
   since it's not able to create a BiConsumer for you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to