HeartSaVioR commented on code in PR #41022:
URL: https://github.com/apache/spark/pull/41022#discussion_r1186982627
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala:
##########
@@ -474,47 +474,64 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
- test("SPARK-21996 read from text files -- file name has space") {
- withTempDirs { case (src, tmp) =>
- val textStream = createFileStream("text", src.getCanonicalPath)
- val filtered = textStream.filter($"value" contains "keep")
+ test("SPARK-21996/SPARK-43343 read from text files -- file name has special
chars") {
+ Seq(" ", "[", "[123]").foreach { special_str =>
+ withTempDirs { case (src, tmp) =>
+ val textStream = createFileStream("text", src.getCanonicalPath)
+ val filtered = textStream.filter($"value" contains "keep")
- testStream(filtered)(
- AddTextFileData("drop1\nkeep2\nkeep3", src, tmp, "text text"),
- CheckAnswer("keep2", "keep3")
- )
+ testStream(filtered)(
+ AddTextFileData("drop1\nkeep2\nkeep3", src, tmp,
s"text${special_str}text"),
+ CheckAnswer("keep2", "keep3")
+ )
+ }
}
}
- test("SPARK-21996 read from text files generated by file sink -- file name
has space") {
+ test(
+ "SPARK-21996/SPARK-43343 read from text files generated by file sink --" +
+ "file name has special chars") {
val testTableName = "FileStreamSourceTest"
- withTable(testTableName) {
- withTempDirs { case (src, checkpoint) =>
- val output = new File(src, "text text")
- val inputData = MemoryStream[String]
- val ds = inputData.toDS()
-
- val query = ds.writeStream
- .option("checkpointLocation", checkpoint.getCanonicalPath)
- .format("text")
- .start(output.getCanonicalPath)
+ Seq(" ", "[", "[123]").foreach{ special_str =>
+ withTable(testTableName) {
+ withTempDirs { case (src, checkpoint) =>
+ val output = new File(src, s"text${special_str}text")
+ val inputData = MemoryStream[String]
+ val ds = inputData.toDS()
+
+ val query = ds.writeStream
+ .option("checkpointLocation", checkpoint.getCanonicalPath)
+ .format("text")
+ .start(output.getCanonicalPath)
- try {
- inputData.addData("foo")
- failAfter(streamingTimeout) {
- query.processAllAvailable()
+ try {
+ inputData.addData("foo")
+ failAfter(streamingTimeout) {
+ query.processAllAvailable()
+ }
+ } finally {
+ query.stop()
}
- } finally {
- query.stop()
- }
- val df2 = spark.readStream.format("text").load(output.getCanonicalPath)
- val query2 =
df2.writeStream.format("memory").queryName(testTableName).start()
- try {
- query2.processAllAvailable()
- checkDatasetUnorderly(spark.table(testTableName).as[String], "foo")
- } finally {
- query2.stop()
+ // We cannot put "[" as part of the input path as it will be
intepreted as glob pattern,
+ // so use replace it with "*".
+ val input_path =
+ if (special_str == " ") {
+ output.getCanonicalPath
+ } else {
+ output
Review Comment:
Shall we give a try with `\` which escapes the next character? We can leave
as it is if it does not work.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##########
@@ -69,12 +69,15 @@ class FileStreamSource(
private val sourceCleaner: Option[FileStreamSourceCleaner] =
FileStreamSourceCleaner(
fs, qualifiedBasePath, sourceOptions, hadoopConf)
- private val optionsWithPartitionBasePath =
sourceOptions.optionMapWithoutPath ++ {
- if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) &&
options.contains("path")) {
- Map("basePath" -> path)
- } else {
- Map()
- }}
+ private val optionsForInnerDataSource = sourceOptions.optionMapWithoutPath
++ {
+ val pathOption =
+ if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) &&
options.contains("path")) {
+ Map("basePath" -> path)
+ } else {
+ Map()
+ }
+ pathOption ++ Map (DataSource.GLOB_PATHS_KEY -> "false")
Review Comment:
nit: not necessarily space between Map and `(` (super nit but consistent
with other codes like above)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]