zsxwing commented on a change in pull request #23733: [SPARK-26824][SS]Fix the 
checkpoint location and _spark_metadata when it contains special chars
URL: https://github.com/apache/spark/pull/23733#discussion_r256108908
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ##########
 @@ -915,6 +918,189 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     )
   }
 
+  test("special characters in checkpoint path") {
+    withTempDir { tempDir =>
+      val checkpointDir = new File(tempDir, "chk @#chk")
+      val inputData = MemoryStream[Int]
+      inputData.addData(1)
+      val q = inputData.toDF()
+        .writeStream
+        .format("noop")
+        .option("checkpointLocation", checkpointDir.getCanonicalPath)
+        .start()
+      try {
+        q.processAllAvailable()
+        assert(checkpointDir.listFiles().toList.nonEmpty)
+      } finally {
+        q.stop()
+      }
+    }
+  }
+
+  /**
+   * Copy the checkpoint generated by Spark 2.4.0 from test resource to `dir` 
to set up a legacy
+   * streaming checkpoint.
+   */
+  private def setUp2dot4dot0Checkpoint(dir: File): Unit = {
+    val input = 
getClass.getResource("/structured-streaming/escaped-path-2.4.0")
+    assert(input != null, "cannot find test resource 
'/structured-streaming/escaped-path-2.4.0'")
+    val inputDir = new File(input.toURI)
+
+    // Copy test files to tempDir so that we won't modify the original data.
+    FileUtils.copyDirectory(inputDir, dir)
+
+    // Spark 2.4 and earlier escaped the _spark_metadata path once
+    val legacySparkMetadataDir = new File(
+      dir,
+      new Path("output %@#output/_spark_metadata").toUri.toString)
+
+    // Migrate from legacy _spark_metadata directory to the new 
_spark_metadata directory.
+    // Ideally we should copy "_spark_metadata" directly like what the user is 
supposed to do to
+    // migrate to new version. However, in our test, "tempDir" will be 
different in each run and
+    // we need to fix the absolute path in the metadata to match "tempDir".
+    val sparkMetadata = FileUtils.readFileToString(new 
File(legacySparkMetadataDir, "0"), "UTF-8")
+    FileUtils.write(
+      new File(legacySparkMetadataDir, "0"),
+      sparkMetadata.replaceAll("TEMPDIR", dir.getCanonicalPath),
+      "UTF-8")
+  }
+
+  test("detect escaped path and report the migration guide") {
+    // Assert that the error message contains the migration conf, path and the 
legacy path.
+    def assertMigrationError(errorMessage: String, path: File, legacyPath: 
File): Unit = {
+      Seq(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key,
+          path.getCanonicalPath,
+          legacyPath.getCanonicalPath).foreach { msg =>
+        assert(errorMessage.contains(msg))
+      }
+    }
+
+    withTempDir { tempDir =>
 
 Review comment:
   Explicitly setting a config in a test is usually because we may change the 
default value in future and we don't want to break any tests.
   
   However, for this special config, I prefer to not set it in test to make 
sure the default behavior is correct. If we decide to turn if off by default in 
future, we probably will just remove this check instead. I don't see a reason 
to turn this off by default and ask the user to turn it on in some cases.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to