zsxwing commented on a change in pull request #23733: [SPARK-26824][SS]Fix the 
checkpoint location and _spark_metadata when it contains special chars
URL: https://github.com/apache/spark/pull/23733#discussion_r258631645
 
 

 ##########
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ##########
 @@ -915,6 +918,189 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     )
   }
 
+  test("special characters in checkpoint path") {
+    withTempDir { tempDir =>
+      val checkpointDir = new File(tempDir, "chk @#chk")
+      val inputData = MemoryStream[Int]
+      inputData.addData(1)
+      val q = inputData.toDF()
+        .writeStream
+        .format("noop")
+        .option("checkpointLocation", checkpointDir.getCanonicalPath)
+        .start()
+      try {
+        q.processAllAvailable()
+        assert(checkpointDir.listFiles().toList.nonEmpty)
+      } finally {
+        q.stop()
+      }
+    }
+  }
+
+  /**
+   * Copy the checkpoint generated by Spark 2.4.0 from test resource to `dir` 
to set up a legacy
+   * streaming checkpoint.
+   */
+  private def setUp2dot4dot0Checkpoint(dir: File): Unit = {
+    val input = 
getClass.getResource("/structured-streaming/escaped-path-2.4.0")
+    assert(input != null, "cannot find test resource 
'/structured-streaming/escaped-path-2.4.0'")
+    val inputDir = new File(input.toURI)
+
+    // Copy test files to tempDir so that we won't modify the original data.
+    FileUtils.copyDirectory(inputDir, dir)
+
+    // Spark 2.4 and earlier escaped the _spark_metadata path once
+    val legacySparkMetadataDir = new File(
+      dir,
+      new Path("output %@#output/_spark_metadata").toUri.toString)
+
+    // Migrate from legacy _spark_metadata directory to the new 
_spark_metadata directory.
+    // Ideally we should copy "_spark_metadata" directly like what the user is 
supposed to do to
+    // migrate to new version. However, in our test, "tempDir" will be 
different in each run and
+    // we need to fix the absolute path in the metadata to match "tempDir".
+    val sparkMetadata = FileUtils.readFileToString(new 
File(legacySparkMetadataDir, "0"), "UTF-8")
+    FileUtils.write(
+      new File(legacySparkMetadataDir, "0"),
+      sparkMetadata.replaceAll("TEMPDIR", dir.getCanonicalPath),
+      "UTF-8")
+  }
+
+  test("detect escaped path and report the migration guide") {
+    // Assert that the error message contains the migration conf, path and the 
legacy path.
+    def assertMigrationError(errorMessage: String, path: File, legacyPath: 
File): Unit = {
+      Seq(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key,
+          path.getCanonicalPath,
+          legacyPath.getCanonicalPath).foreach { msg =>
+        assert(errorMessage.contains(msg))
+      }
+    }
+
+    withTempDir { tempDir =>
+      setUp2dot4dot0Checkpoint(tempDir)
+
+      // Here are the paths we will use to create the query
+      val outputDir = new File(tempDir, "output %@#output")
+      val checkpointDir = new File(tempDir, "chk %@#chk")
+      val sparkMetadataDir = new File(tempDir, "output 
%@#output/_spark_metadata")
+
+      // The escaped paths used by Spark 2.4 and earlier.
+      // Spark 2.4 and earlier escaped the checkpoint path three times
+      val legacyCheckpointDir = new File(
+        tempDir,
+        new Path(new Path(new Path("chk 
%@#chk").toUri.toString).toUri.toString).toUri.toString)
+      // Spark 2.4 and earlier escaped the _spark_metadata path once
+      val legacySparkMetadataDir = new File(
+        tempDir,
+        new Path("output %@#output/_spark_metadata").toUri.toString)
+
+      // Reading a file sink output in a batch query should detect the legacy 
_spark_metadata
+      // directory and throw an error
+      val e = intercept[SparkException] {
+        spark.read.load(outputDir.getCanonicalPath).as[Int]
+      }
+      assertMigrationError(e.getMessage, sparkMetadataDir, 
legacySparkMetadataDir)
+
+      // Restarting the streaming query should detect the legacy 
_spark_metadata directory and throw
+      // an error
+      val inputData = MemoryStream[Int]
+      val e2 = intercept[SparkException] {
+        inputData.toDF()
+          .writeStream
+          .format("parquet")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .start(outputDir.getCanonicalPath)
+      }
+      assertMigrationError(e2.getMessage, sparkMetadataDir, 
legacySparkMetadataDir)
+
+      // Move "_spark_metadata" to fix the file sink and test the checkpoint 
path.
+      FileUtils.moveDirectory(legacySparkMetadataDir, sparkMetadataDir)
+
+      // Restarting the streaming query should detect the legacy checkpoint 
path and throw an error
+      val e3 = intercept[SparkException] {
+        inputData.toDF()
+          .writeStream
+          .format("parquet")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .start(outputDir.getCanonicalPath)
+      }
+      assertMigrationError(e3.getMessage, checkpointDir, legacyCheckpointDir)
+
+      // Fix the checkpoint path and verify that the user can migrate the 
issue by moving files.
+      FileUtils.moveDirectory(legacyCheckpointDir, checkpointDir)
+
+      val q = inputData.toDF()
+        .writeStream
+        .format("parquet")
+        .option("checkpointLocation", checkpointDir.getCanonicalPath)
+        .start(outputDir.getCanonicalPath)
+      try {
+        q.processAllAvailable()
+        // Check the query id to make sure it did use checkpoint
+        assert(q.id.toString == "09be7fb3-49d8-48a6-840d-e9c2ad92a898")
+
+        // Verify that the batch query can read "_spark_metadata" correctly 
after migration.
+        val df = spark.read.load(outputDir.getCanonicalPath)
+        assert(df.queryExecution.executedPlan.toString contains 
"MetadataLogFileIndex")
+        checkDatasetUnorderly(df.as[Int], 1, 2, 3)
+      } finally {
+        q.stop()
+      }
+    }
+  }
+
+  test("ignore the escaped path check when the flag is off") {
+    withTempDir { tempDir =>
+      setUp2dot4dot0Checkpoint(tempDir)
+      val outputDir = new File(tempDir, "output %@#output")
+      val checkpointDir = new File(tempDir, "chk %@#chk")
+
+      withSQLConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key 
-> "false") {
+        // Verify that the batch query ignores the legacy "_spark_metadata"
+        val df = spark.read.load(outputDir.getCanonicalPath)
+        assert(!(df.queryExecution.executedPlan.toString contains 
"MetadataLogFileIndex"))
+        checkDatasetUnorderly(df.as[Int], 1, 2, 3)
+
+        val inputData = MemoryStream[Int]
+        val q = inputData.toDF()
+          .writeStream
+          .format("parquet")
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .start(outputDir.getCanonicalPath)
+        try {
+          q.processAllAvailable()
+          // Check the query id to make sure it ignores the legacy checkpoint
+          assert(q.id.toString != "09be7fb3-49d8-48a6-840d-e9c2ad92a898")
+        } finally {
+          q.stop()
+        }
+      }
+    }
+  }
+
+  test("containsSpecialCharsInPath") {
+    Seq("foo/b ar",
+        "/foo/b ar",
+        "file:/foo/b ar",
+        "file://foo/b ar",
+        "file:///foo/b ar",
+        "file://foo:bar@bar/foo/b ar").foreach { p =>
+      assert(StreamExecution.containsSpecialCharsInPath(new Path(p)), s"failed 
to check $p")
+    }
+    Seq("foo/bar",
+        "/foo/bar",
+        "file:/foo/bar",
+        "file://foo/bar",
+        "file:///foo/bar",
+        "file://foo:bar@bar/foo/bar",
+        // Special chars not in a path should not be considered as such urls 
won't hit the escaped
+        // path issue.
+        "file://foo:b ar@bar/foo/bar",
+        "file://foo:bar@b ar/foo/bar",
+        "file://f oo:bar@bar/foo/bar").foreach { p =>
 
 Review comment:
   @brkyvz The APIs called in `containsSpecialCharsInPath` don't involve 
`FileSystem`s. The scheme `file` here can be replaced with random strings.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to