gaborgsomogyi commented on a change in pull request #23733: 
[SPARK-26824][SS]Fix the checkpoint location and _spark_metadata when it 
contains special chars
URL: https://github.com/apache/spark/pull/23733#discussion_r255912907
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ##########
 @@ -915,6 +918,189 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     )
   }
 
+  test("special characters in checkpoint path") {
+    withTempDir { tempDir =>
+      val checkpointDir = new File(tempDir, "chk @#chk")
+      val inputData = MemoryStream[Int]
+      inputData.addData(1)
+      val q = inputData.toDF()
+        .writeStream
+        .format("noop")
+        .option("checkpointLocation", checkpointDir.getCanonicalPath)
+        .start()
+      try {
+        q.processAllAvailable()
+        assert(checkpointDir.listFiles().toList.nonEmpty)
+      } finally {
+        q.stop()
+      }
+    }
+  }
+
+  /**
+   * Copy the checkpoint generated by Spark 2.4.0 from test resource to `dir` 
to set up a legacy
+   * streaming checkpoint.
+   */
+  private def setUp2dot4dot0Checkpoint(dir: File): Unit = {
+    val input = 
getClass.getResource("/structured-streaming/escaped-path-2.4.0")
+    assert(input != null, "cannot find test resource 
'/structured-streaming/escaped-path-2.4.0'")
+    val inputDir = new File(input.toURI)
+
+    // Copy test files to tempDir so that we won't modify the original data.
+    FileUtils.copyDirectory(inputDir, dir)
+
+    // Spark 2.4 and earlier escaped the _spark_metadata path once
+    val legacySparkMetadataDir = new File(
+      dir,
+      new Path("output %@#output/_spark_metadata").toUri.toString)
+
+    // Migrate from legacy _spark_metadata directory to the new 
_spark_metadata directory.
+    // Ideally we should copy "_spark_metadata" directly like what the user is 
supposed to do to
+    // migrate to new version. However, in our test, "tempDir" will be 
different in each run and
+    // we need to fix the absolute path in the metadata to match "tempDir".
+    val sparkMetadata = FileUtils.readFileToString(new 
File(legacySparkMetadataDir, "0"), "UTF-8")
+    FileUtils.write(
+      new File(legacySparkMetadataDir, "0"),
+      sparkMetadata.replaceAll("TEMPDIR", dir.getCanonicalPath),
+      "UTF-8")
+  }
+
+  test("detect escaped path and report the migration guide") {
+    // Assert that the error message contains the migration conf, path and the 
legacy path.
+    def assertMigrationError(errorMessage: String, path: File, legacyPath: 
File): Unit = {
+      Seq(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key,
+          path.getCanonicalPath,
+          legacyPath.getCanonicalPath).foreach { msg =>
+        assert(errorMessage.contains(msg))
+      }
+    }
+
+    withTempDir { tempDir =>
 
 Review comment:
   Wouldn't it be better to make 
`withSQLConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key -> 
"true")` explicit (I know that's the default now)?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to