dongjoon-hyun commented on a change in pull request #35509:
URL: https://github.com/apache/spark/pull/35509#discussion_r806535077



##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -17,13 +17,21 @@
 
 package org.apache.spark.deploy.k8s
 
+import java.io.File
+import java.nio.charset.Charset
+import java.util.concurrent.TimeUnit
+
 import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.SparkFunSuite

Review comment:
       ```scala
   -import org.apache.spark.SparkFunSuite
   +import org.apache.spark.{SparkException, SparkFunSuite}
   ```

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))

Review comment:
       ```scala
   -    val uploadFileToHadoopCompatibleFSMethod =
   -      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
   +    val upload = 
PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
   ```

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException

Review comment:
       ```
   -        import org.apache.spark.SparkException
   ```

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))

Review comment:
       ```scala
   -        KubernetesUtils
   -          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, 
fs, false, true))
   +        KubernetesUtils.invokePrivate(upload(src, dest, fs, false, true))
   ```

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException

Review comment:
       ```scala
   -        import org.apache.spark.SparkException
   ```

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))

Review comment:
       ```scala
   -        KubernetesUtils
   -          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, 
fs, false, true))
   +        KubernetesUtils.invokePrivate(upload(src, dest, fs, false, true))
   ```

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, false))

Review comment:
       ```scala
   -          KubernetesUtils
   -            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, 
fs, false, false))
   +          KubernetesUtils.invokePrivate(upload(src, dest, fs, false, false))
   ```

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 4: delSrc = true and overwrite = true,
+        // upload succeeded, `ModificationTime` changed and src not exists.
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
true, true))

Review comment:
       ```scala
   -        KubernetesUtils
   -          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, 
fs, true, true))
   +        KubernetesUtils.invokePrivate(upload(src, dest, fs, true, true))
   ```

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       `1s` is too high. Shall we minimize this delay?

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       ditto. This is two high.

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)

Review comment:
       Shall we use a stronger `>` condition instead of weaker condition `!=`?

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 4: delSrc = true and overwrite = true,
+        // upload succeeded, `ModificationTime` changed and src not exists.
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
true, true))
+        val thirdUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(secondUploadTime != thirdUploadTime)

Review comment:
       ditto. `<` instead of `!=`.

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)

Review comment:
       Shall we use a stronger `<` condition instead of weaker condition `!=`?

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 2: delSrc = false and overwrite = true,
+        // upload succeeded but `ModificationTime` changed
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val secondUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(firstUploadTime != secondUploadTime)
+
+        // Scenario 3: delSrc = false and overwrite = false,
+        // upload failed because dest exists
+        val message = intercept[SparkException] {
+          KubernetesUtils
+            .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, false))
+        }.getMessage
+        assert(message.contains("Error uploading file"))
+
+        TimeUnit.SECONDS.sleep(1)
+        // Scenario 4: delSrc = true and overwrite = true,
+        // upload succeeded, `ModificationTime` changed and src not exists.
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
true, true))
+        val thirdUploadTime = fs.getFileStatus(dest).getModificationTime
+        assert(secondUploadTime != thirdUploadTime)
+        assert(!fs.exists(src))
+      }

Review comment:
       So, we don't test `delSrc=true` and `overwrite=false`?

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,43 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val upload = PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())

Review comment:
       If possible, shall we use `StandardCharsets.UTF_8` instead of 
`Charset.defaultCharset()`?

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       Since the unit of `getModificationTime` is a millisecond , can we try 
`TimeUnit.MILLISECONDS.sleep(100)`?

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       Oh, got it.

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       In that case, we had better check the actual content of file instead of 
the modification time.

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       Please use `FileUtils.readFileToString`.

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +73,49 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+    val uploadFileToHadoopCompatibleFSMethod =
+      PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+    withTempDir { srcDir =>
+      val fileName = "test.txt"
+      val srcFile = new File(srcDir, fileName)
+      FileUtils.write(srcFile, "test", Charset.defaultCharset())
+      withTempDir { destDir =>
+        import org.apache.spark.SparkException
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Scenario 1: delSrc = false and overwrite = true, upload successful
+        KubernetesUtils
+          .invokePrivate(uploadFileToHadoopCompatibleFSMethod(src, dest, fs, 
false, true))
+        val firstUploadTime = fs.getFileStatus(dest).getModificationTime
+        // sleep 1s to ensure that the `ModificationTime` changes.
+        TimeUnit.SECONDS.sleep(1)

Review comment:
       When it comes to the test time, Apache Spark community is very careful 
and has been trying to reduce the test time again and again. `Sleep` is one of 
the typical pattern we want to avoid.

##########
File path: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
##########
@@ -65,4 +72,61 @@ class KubernetesUtilsSuite extends SparkFunSuite {
     assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
     assert(sparkPodWithNoContainerName.container.getName == null)
   }
+
+  test("SPARK-38201: check uploadFileToHadoopCompatibleFS with different 
delSrc and overwrite") {
+
+    def checkUploadFailed(f: () => Unit): Unit = {
+      val message = intercept[SparkException] {
+        f
+      }.getMessage
+      assert(message.contains("Error uploading file"))
+    }
+
+    withTempDir { srcDir =>
+      withTempDir { destDir =>
+        val upload = 
PrivateMethod[Unit](Symbol("uploadFileToHadoopCompatibleFS"))
+        val fileName = "test.txt"
+        val srcFile = new File(srcDir, fileName)
+        val src = new Path(srcFile.getAbsolutePath)
+        val dest = new Path(destDir.getAbsolutePath, fileName)
+        val fs = src.getFileSystem(new Configuration())
+        // Write a new file, upload file with delSrc = false and overwrite = 
true.
+        // Upload successful and record the `fileLength`.
+        FileUtils.write(srcFile, "init-content", StandardCharsets.UTF_8)
+        KubernetesUtils.invokePrivate(upload(src, dest, fs, false, true))
+        val firstLength = fs.getFileStatus(dest).getLen
+
+        // Append the file, upload file with delSrc = false and overwrite = 
true.
+        // Upload succeeded but `fileLength` changed.
+        FileUtils.write(srcFile, "append-content", StandardCharsets.UTF_8, 
true)
+        KubernetesUtils.invokePrivate(upload(src, dest, fs, false, true))
+        val secondLength = fs.getFileStatus(dest).getLen
+        assert(firstLength < secondLength)
+
+        // Upload file with delSrc = false and overwrite = false.
+        // Upload failed because dest exists.
+        val message1 = intercept[SparkException] {
+          KubernetesUtils.invokePrivate(upload(src, dest, fs, false, false))
+        }.getMessage
+        assert(message1.contains("Error uploading file"))

Review comment:
       It seems that we need to check that the uploaded file is not changed 
here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to