FrankYang0529 commented on code in PR #19579: URL: https://github.com/apache/kafka/pull/19579#discussion_r2063791648
########## core/src/main/scala/kafka/server/BrokerServer.scala: ########## @@ -709,7 +709,7 @@ class BrokerServer( None } - val rlm = new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, + val rlm = new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.asScala.head, clusterId, time, Review Comment: It looks like we can use `getFirst` to avoid redundant conversion. ```suggestion val rlm = new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.getFirst, clusterId, time, ``` ########## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ########## @@ -1551,7 +1551,7 @@ class KRaftClusterTest { // Copy foo-0 to targetParentDir // This is so that we can rename the main replica to a future down below val parentDir = log.parentDir - val targetParentDir = broker0.config.logDirs.filter(_ != parentDir).head + val targetParentDir = broker0.config.logDirs.asScala.filter(_ != parentDir).head val targetDirFile = new File(targetParentDir, log.dir.getName) Review Comment: ```suggestion val targetParentDir = broker0.config.logDirs.stream().filter(l => !l.equals(parentDir)).findFirst() assertTrue(targetParentDir.isPresent) val targetDirFile = new File(targetParentDir.get(), log.dir.getName) ``` ########## core/src/main/scala/kafka/server/KafkaConfig.scala: ########## @@ -241,7 +241,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) def metadataLogDir: String = { Option(getString(MetadataLogConfig.METADATA_LOG_DIR_CONFIG)) match { case Some(dir) => dir - case None => logDirs.head + case None => logDirs.asScala.head Review Comment: ditto ########## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ########## @@ -91,10 +91,10 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { @ParameterizedTest @ValueSource(strings = Array("kraft")) def testAlterReplicaLogDirsRequestErrorCode(quorum: String): Unit = { - val offlineDir = new File(brokers.head.config.logDirs.tail.head).getAbsolutePath - val validDir1 = new File(brokers.head.config.logDirs(1)).getAbsolutePath - val validDir2 = new File(brokers.head.config.logDirs(2)).getAbsolutePath - val validDir3 = new File(brokers.head.config.logDirs(3)).getAbsolutePath + val offlineDir = new File(brokers.head.config.logDirs.asScala.tail.head).getAbsolutePath Review Comment: ```suggestion val offlineDir = new File(brokers.head.config.logDirs.getLast).getAbsolutePath ``` ########## core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala: ########## @@ -42,8 +42,8 @@ class DescribeLogDirsRequestTest extends BaseRequestTest { @ParameterizedTest @ValueSource(strings = Array("kraft")) def testDescribeLogDirsRequest(quorum: String): Unit = { - val onlineDir = new File(brokers.head.config.logDirs.head).getAbsolutePath - val offlineDir = new File(brokers.head.config.logDirs.tail.head).getAbsolutePath + val onlineDir = new File(brokers.head.config.logDirs.asScala.head).getAbsolutePath + val offlineDir = new File(brokers.head.config.logDirs.asScala.tail.head).getAbsolutePath Review Comment: ```suggestion val onlineDir = new File(brokers.head.config.logDirs.getFirst).getAbsolutePath val offlineDir = new File(brokers.head.config.logDirs.getLast).getAbsolutePath ``` ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -1237,7 +1237,7 @@ class ReplicaManager(val config: KafkaConfig, def describeLogDirs(partitions: Set[TopicPartition]): List[DescribeLogDirsResponseData.DescribeLogDirsResult] = { val logsByDir = logManager.allLogs.groupBy(log => log.parentDir) - config.logDirs.toSet.map { logDir: String => + config.logDirs.asScala.toSet.map { logDir: String => Review Comment: Can we change `describeLogDirs` to return `java.util.List`? So we don't need to convert `logDirs` to Scala and convert result back to Java again in `KafkaApis`. ########## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ########## @@ -1500,7 +1500,7 @@ class KafkaConfigTest { val config = KafkaConfig.fromProps(props) assertEquals(metadataDir, config.metadataLogDir) - assertEquals(Seq(dataDir), config.logDirs) + assertEquals(Seq(dataDir), config.logDirs.asScala) Review Comment: ```suggestion assertEquals(util.List.of(dataDir), config.logDirs) ``` ########## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ########## @@ -1165,7 +1165,7 @@ class KafkaConfigTest { assertEquals(1, config.brokerId) assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), config.effectiveAdvertisedBrokerListeners.map(JTestUtils.endpointToString)) assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides) - assertEquals(List("/tmp1", "/tmp2"), config.logDirs) + assertEquals(List("/tmp1", "/tmp2"), config.logDirs.asScala.toList) Review Comment: ```suggestion assertEquals(util.List.of("/tmp1", "/tmp2"), config.logDirs) ``` ########## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ########## @@ -1518,7 +1518,7 @@ class KafkaConfigTest { val config = KafkaConfig.fromProps(props) assertEquals(dataDir1, config.metadataLogDir) - assertEquals(Seq(dataDir1, dataDir2), config.logDirs) + assertEquals(Seq(dataDir1, dataDir2), config.logDirs.asScala) Review Comment: ```suggestion assertEquals(util.List.of(dataDir1, dataDir2), config.logDirs) ``` ########## core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala: ########## @@ -62,8 +64,8 @@ class LogRecoveryTest extends QuorumTestHarness { var admin: Admin = _ var producer: KafkaProducer[Integer, String] = _ - def hwFile1 = new OffsetCheckpointFile(new File(configProps1.logDirs.head, ReplicaManager.HighWatermarkFilename), null) - def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.head, ReplicaManager.HighWatermarkFilename), null) + def hwFile1 = new OffsetCheckpointFile(new File(configProps1.logDirs.asScala.head, ReplicaManager.HighWatermarkFilename), null) + def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.asScala.head, ReplicaManager.HighWatermarkFilename), null) Review Comment: ```suggestion def hwFile1 = new OffsetCheckpointFile(new File(configProps1.logDirs.getFirst, ReplicaManager.HighWatermarkFilename), null) def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.getFirst, ReplicaManager.HighWatermarkFilename), null) ``` ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java: ########## @@ -115,8 +115,7 @@ public void setup() { this.time = Time.SYSTEM; this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size()); final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false); - final List<File> files = - CollectionConverters.asJava(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); + final List<File> files = brokerProperties.logDirs().stream().map(File::new).collect(Collectors.toList()); Review Comment: ```suggestion final List<File> files = brokerProperties.logDirs().stream().map(File::new).toList(); ``` ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java: ########## @@ -108,8 +108,7 @@ public void setup() { this.metrics = new Metrics(); this.time = new MockTime(); this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size()); - final List<File> files = - CollectionConverters.asJava(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); + final List<File> files = brokerProperties.logDirs().stream().map(File::new).collect(Collectors.toList()); Review Comment: ```suggestion final List<File> files = brokerProperties.logDirs().stream().map(File::new).toList(); ``` ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -2771,7 +2771,7 @@ class ReplicaManagerTest { props.asScala ++= extraProps.asScala val config = KafkaConfig.fromProps(props) val logConfig = new LogConfig(new Properties) - val logDir = new File(new File(config.logDirs.head), s"$topic-$topicPartition") + val logDir = new File(new File(config.logDirs.asScala.head), s"$topic-$topicPartition") Review Comment: ```suggestion val logDir = new File(new File(config.logDirs.getFirst), s"$topic-$topicPartition") ``` ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -214,7 +214,7 @@ class ReplicaManagerTest { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None) rm.checkpointHighWatermarks() - config.logDirs.map(s => Paths.get(s, ReplicaManager.HighWatermarkFilename)) + config.logDirs.asScala.map(s => Paths.get(s, ReplicaManager.HighWatermarkFilename)) .foreach(checkpointFile => assertTrue(Files.exists(checkpointFile), Review Comment: ```suggestion config.logDirs.stream().map(s => Paths.get(s, ReplicaManager.HighWatermarkFilename)) .forEach(checkpointFile => assertTrue(Files.exists(checkpointFile), ``` ########## core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala: ########## @@ -195,7 +197,7 @@ class HighwatermarkPersistenceTest { } private def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = { - replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read().getOrDefault( + replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.asScala.head).getAbsolutePath).read().getOrDefault( Review Comment: ```suggestion replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.getFirst).getAbsolutePath).read().getOrDefault( ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org