FrankYang0529 commented on code in PR #19579:
URL: https://github.com/apache/kafka/pull/19579#discussion_r2063791648


##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -709,7 +709,7 @@ class BrokerServer(
         None
       }
 
-      val rlm = new RemoteLogManager(config.remoteLogManagerConfig, 
config.brokerId, config.logDirs.head, clusterId, time,
+      val rlm = new RemoteLogManager(config.remoteLogManagerConfig, 
config.brokerId, config.logDirs.asScala.head, clusterId, time,

Review Comment:
   It looks like we can use `getFirst` to avoid redundant conversion.
   
   ```suggestion
         val rlm = new RemoteLogManager(config.remoteLogManagerConfig, 
config.brokerId, config.logDirs.getFirst, clusterId, time,
   ```



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -1551,7 +1551,7 @@ class KRaftClusterTest {
         // Copy foo-0 to targetParentDir
         // This is so that we can rename the main replica to a future down 
below
         val parentDir = log.parentDir
-        val targetParentDir = broker0.config.logDirs.filter(_ != 
parentDir).head
+        val targetParentDir = broker0.config.logDirs.asScala.filter(_ != 
parentDir).head
         val targetDirFile = new File(targetParentDir, log.dir.getName)

Review Comment:
   ```suggestion
           val targetParentDir = broker0.config.logDirs.stream().filter(l => 
!l.equals(parentDir)).findFirst()
           assertTrue(targetParentDir.isPresent)
           val targetDirFile = new File(targetParentDir.get(), log.dir.getName)
   ```



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -241,7 +241,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   def metadataLogDir: String = {
     Option(getString(MetadataLogConfig.METADATA_LOG_DIR_CONFIG)) match {
       case Some(dir) => dir
-      case None => logDirs.head
+      case None => logDirs.asScala.head

Review Comment:
   ditto



##########
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##########
@@ -91,10 +91,10 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def testAlterReplicaLogDirsRequestErrorCode(quorum: String): Unit = {
-    val offlineDir = new 
File(brokers.head.config.logDirs.tail.head).getAbsolutePath
-    val validDir1 = new File(brokers.head.config.logDirs(1)).getAbsolutePath
-    val validDir2 = new File(brokers.head.config.logDirs(2)).getAbsolutePath
-    val validDir3 = new File(brokers.head.config.logDirs(3)).getAbsolutePath
+    val offlineDir = new 
File(brokers.head.config.logDirs.asScala.tail.head).getAbsolutePath

Review Comment:
   ```suggestion
       val offlineDir = new 
File(brokers.head.config.logDirs.getLast).getAbsolutePath
   ```



##########
core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala:
##########
@@ -42,8 +42,8 @@ class DescribeLogDirsRequestTest extends BaseRequestTest {
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def testDescribeLogDirsRequest(quorum: String): Unit = {
-    val onlineDir = new File(brokers.head.config.logDirs.head).getAbsolutePath
-    val offlineDir = new 
File(brokers.head.config.logDirs.tail.head).getAbsolutePath
+    val onlineDir = new 
File(brokers.head.config.logDirs.asScala.head).getAbsolutePath
+    val offlineDir = new 
File(brokers.head.config.logDirs.asScala.tail.head).getAbsolutePath

Review Comment:
   ```suggestion
       val onlineDir = new 
File(brokers.head.config.logDirs.getFirst).getAbsolutePath
       val offlineDir = new 
File(brokers.head.config.logDirs.getLast).getAbsolutePath
   ```



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1237,7 +1237,7 @@ class ReplicaManager(val config: KafkaConfig,
   def describeLogDirs(partitions: Set[TopicPartition]): 
List[DescribeLogDirsResponseData.DescribeLogDirsResult] = {
     val logsByDir = logManager.allLogs.groupBy(log => log.parentDir)
 
-    config.logDirs.toSet.map { logDir: String =>
+    config.logDirs.asScala.toSet.map { logDir: String =>

Review Comment:
   Can we change `describeLogDirs` to return `java.util.List`? So we don't need 
to convert `logDirs` to Scala and convert result back to Java again in 
`KafkaApis`.



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1500,7 +1500,7 @@ class KafkaConfigTest {
 
     val config = KafkaConfig.fromProps(props)
     assertEquals(metadataDir, config.metadataLogDir)
-    assertEquals(Seq(dataDir), config.logDirs)
+    assertEquals(Seq(dataDir), config.logDirs.asScala)

Review Comment:
   ```suggestion
       assertEquals(util.List.of(dataDir), config.logDirs)
   ```



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1165,7 +1165,7 @@ class KafkaConfigTest {
     assertEquals(1, config.brokerId)
     assertEquals(Seq("PLAINTEXT://127.0.0.1:1122"), 
config.effectiveAdvertisedBrokerListeners.map(JTestUtils.endpointToString))
     assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), 
config.maxConnectionsPerIpOverrides)
-    assertEquals(List("/tmp1", "/tmp2"), config.logDirs)
+    assertEquals(List("/tmp1", "/tmp2"), config.logDirs.asScala.toList)

Review Comment:
   ```suggestion
       assertEquals(util.List.of("/tmp1", "/tmp2"), config.logDirs)
   ```



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1518,7 +1518,7 @@ class KafkaConfigTest {
 
     val config = KafkaConfig.fromProps(props)
     assertEquals(dataDir1, config.metadataLogDir)
-    assertEquals(Seq(dataDir1, dataDir2), config.logDirs)
+    assertEquals(Seq(dataDir1, dataDir2), config.logDirs.asScala)

Review Comment:
   ```suggestion
       assertEquals(util.List.of(dataDir1, dataDir2), config.logDirs)
   ```



##########
core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala:
##########
@@ -62,8 +64,8 @@ class LogRecoveryTest extends QuorumTestHarness {
 
   var admin: Admin = _
   var producer: KafkaProducer[Integer, String] = _
-  def hwFile1 = new OffsetCheckpointFile(new File(configProps1.logDirs.head, 
ReplicaManager.HighWatermarkFilename), null)
-  def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.head, 
ReplicaManager.HighWatermarkFilename), null)
+  def hwFile1 = new OffsetCheckpointFile(new 
File(configProps1.logDirs.asScala.head, ReplicaManager.HighWatermarkFilename), 
null)
+  def hwFile2 = new OffsetCheckpointFile(new 
File(configProps2.logDirs.asScala.head, ReplicaManager.HighWatermarkFilename), 
null)

Review Comment:
   ```suggestion
     def hwFile1 = new OffsetCheckpointFile(new 
File(configProps1.logDirs.getFirst, ReplicaManager.HighWatermarkFilename), null)
     def hwFile2 = new OffsetCheckpointFile(new 
File(configProps2.logDirs.getFirst, ReplicaManager.HighWatermarkFilename), null)
   ```



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java:
##########
@@ -115,8 +115,7 @@ public void setup() {
         this.time = Time.SYSTEM;
         this.failureChannel = new 
LogDirFailureChannel(brokerProperties.logDirs().size());
         final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false);
-        final List<File> files =
-                
CollectionConverters.asJava(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
+        final List<File> files = 
brokerProperties.logDirs().stream().map(File::new).collect(Collectors.toList());

Review Comment:
   ```suggestion
           final List<File> files = 
brokerProperties.logDirs().stream().map(File::new).toList();
   ```



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java:
##########
@@ -108,8 +108,7 @@ public void setup() {
         this.metrics = new Metrics();
         this.time = new MockTime();
         this.failureChannel = new 
LogDirFailureChannel(brokerProperties.logDirs().size());
-        final List<File> files =
-            
CollectionConverters.asJava(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
+        final List<File> files = 
brokerProperties.logDirs().stream().map(File::new).collect(Collectors.toList());

Review Comment:
   ```suggestion
           final List<File> files = 
brokerProperties.logDirs().stream().map(File::new).toList();
   ```



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2771,7 +2771,7 @@ class ReplicaManagerTest {
     props.asScala ++= extraProps.asScala
     val config = KafkaConfig.fromProps(props)
     val logConfig = new LogConfig(new Properties)
-    val logDir = new File(new File(config.logDirs.head), 
s"$topic-$topicPartition")
+    val logDir = new File(new File(config.logDirs.asScala.head), 
s"$topic-$topicPartition")

Review Comment:
   ```suggestion
       val logDir = new File(new File(config.logDirs.getFirst), 
s"$topic-$topicPartition")
   ```



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -214,7 +214,7 @@ class ReplicaManagerTest {
       partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
         new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
       rm.checkpointHighWatermarks()
-      config.logDirs.map(s => Paths.get(s, 
ReplicaManager.HighWatermarkFilename))
+      config.logDirs.asScala.map(s => Paths.get(s, 
ReplicaManager.HighWatermarkFilename))
         .foreach(checkpointFile => assertTrue(Files.exists(checkpointFile),

Review Comment:
   ```suggestion
         config.logDirs.stream().map(s => Paths.get(s, 
ReplicaManager.HighWatermarkFilename))
           .forEach(checkpointFile => assertTrue(Files.exists(checkpointFile),
   ```



##########
core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala:
##########
@@ -195,7 +197,7 @@ class HighwatermarkPersistenceTest {
   }
 
   private def hwmFor(replicaManager: ReplicaManager, topic: String, partition: 
Int): Long = {
-    replicaManager.highWatermarkCheckpoints(new 
File(replicaManager.config.logDirs.head).getAbsolutePath).read().getOrDefault(
+    replicaManager.highWatermarkCheckpoints(new 
File(replicaManager.config.logDirs.asScala.head).getAbsolutePath).read().getOrDefault(

Review Comment:
   ```suggestion
       replicaManager.highWatermarkCheckpoints(new 
File(replicaManager.config.logDirs.getFirst).getAbsolutePath).read().getOrDefault(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to