HeartSaVioR commented on code in PR #41022:
URL: https://github.com/apache/spark/pull/41022#discussion_r1186982627


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -474,47 +474,64 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
-  test("SPARK-21996 read from text files -- file name has space") {
-    withTempDirs { case (src, tmp) =>
-      val textStream = createFileStream("text", src.getCanonicalPath)
-      val filtered = textStream.filter($"value" contains "keep")
+  test("SPARK-21996/SPARK-43343 read from text files -- file name has special 
chars") {
+    Seq(" ", "[", "[123]").foreach { special_str =>
+      withTempDirs { case (src, tmp) =>
+        val textStream = createFileStream("text", src.getCanonicalPath)
+        val filtered = textStream.filter($"value" contains "keep")
 
-      testStream(filtered)(
-        AddTextFileData("drop1\nkeep2\nkeep3", src, tmp, "text text"),
-        CheckAnswer("keep2", "keep3")
-      )
+        testStream(filtered)(
+          AddTextFileData("drop1\nkeep2\nkeep3", src, tmp, 
s"text${special_str}text"),
+          CheckAnswer("keep2", "keep3")
+        )
+      }
     }
   }
 
-  test("SPARK-21996 read from text files generated by file sink -- file name 
has space") {
+  test(
+    "SPARK-21996/SPARK-43343 read from text files generated by file sink --" +
+    "file name has special chars") {
     val testTableName = "FileStreamSourceTest"
-    withTable(testTableName) {
-      withTempDirs { case (src, checkpoint) =>
-        val output = new File(src, "text text")
-        val inputData = MemoryStream[String]
-        val ds = inputData.toDS()
-
-        val query = ds.writeStream
-          .option("checkpointLocation", checkpoint.getCanonicalPath)
-          .format("text")
-          .start(output.getCanonicalPath)
+    Seq(" ", "[", "[123]").foreach{ special_str =>
+      withTable(testTableName) {
+        withTempDirs { case (src, checkpoint) =>
+          val output = new File(src, s"text${special_str}text")
+          val inputData = MemoryStream[String]
+          val ds = inputData.toDS()
+
+          val query = ds.writeStream
+            .option("checkpointLocation", checkpoint.getCanonicalPath)
+            .format("text")
+            .start(output.getCanonicalPath)
 
-        try {
-          inputData.addData("foo")
-          failAfter(streamingTimeout) {
-            query.processAllAvailable()
+          try {
+            inputData.addData("foo")
+            failAfter(streamingTimeout) {
+              query.processAllAvailable()
+            }
+          } finally {
+            query.stop()
           }
-        } finally {
-          query.stop()
-        }
 
-        val df2 = spark.readStream.format("text").load(output.getCanonicalPath)
-        val query2 = 
df2.writeStream.format("memory").queryName(testTableName).start()
-        try {
-          query2.processAllAvailable()
-          checkDatasetUnorderly(spark.table(testTableName).as[String], "foo")
-        } finally {
-          query2.stop()
+          // We cannot put "[" as part of the input path as it will be 
intepreted as glob pattern,
+          // so use replace it with "*".
+          val input_path =
+            if (special_str == " ") {
+              output.getCanonicalPath
+            } else {
+              output

Review Comment:
   Shall we give a try with `\` which escapes the next character? We can leave 
as it is if it does not work.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -69,12 +69,15 @@ class FileStreamSource(
   private val sourceCleaner: Option[FileStreamSourceCleaner] = 
FileStreamSourceCleaner(
     fs, qualifiedBasePath, sourceOptions, hadoopConf)
 
-  private val optionsWithPartitionBasePath = 
sourceOptions.optionMapWithoutPath ++ {
-    if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && 
options.contains("path")) {
-      Map("basePath" -> path)
-    } else {
-      Map()
-    }}
+  private val optionsForInnerDataSource = sourceOptions.optionMapWithoutPath 
++ {
+    val pathOption =
+      if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && 
options.contains("path")) {
+        Map("basePath" -> path)
+      } else {
+        Map()
+      }
+    pathOption ++ Map (DataSource.GLOB_PATHS_KEY -> "false")

Review Comment:
   nit: not necessarily space between Map and `(` (super nit but consistent 
with other codes like above)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to