Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r61498520
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
    @@ -439,6 +442,105 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest with SharedSQLContext {
         }
       }
     
    +  test("read new files in nested directories with globbing") {
    +    withTempDirs { case (src, tmp) =>
    +
    +      val fileStream = createFileStream("text", 
s"${src.getCanonicalPath}/*/*")
    +      val filtered = fileStream.filter($"value" contains "keep")
    +      val srcSubDir = new File(src, "newSubDir")
    +      val srcSubSubDir = new File(srcSubDir, "newSubSubDir")
    +
    +      testStream(filtered)(
    +        // Create new dir/subdir and write to it, should read
    +        AddTextFileData("drop1\nkeep2", srcSubDir, tmp),
    +        CheckAnswer("keep2"),
    +
    +        // Append to dir/subdir, should read
    +        AddTextFileData("keep3", srcSubDir, tmp),
    +        CheckAnswer("keep2", "keep3"),
    +
    +        // Create new dir/subDir/subsubdir and write to it, should read
    +        AddTextFileData("keep4", srcSubSubDir, tmp),
    +        CheckAnswer("keep2", "keep3", "keep4"),
    +
    +        // Append to src dir, should not read as globbing src/*/* does not 
capture files in dir,
    +        // only captures files in dir/subdirs/
    +        AddTextFileData("keep5", src, tmp),
    +
    +        // Append to dir/subDir/subsubdir/, should not read
    +        AddTextFileData("keep6", srcSubSubDir, tmp),
    +        AddTextFileData("keep7", srcSubDir, tmp), // needed to make query 
detect new data
    +        CheckAnswer("keep2", "keep3", "keep4", "keep7")
    +      )
    +    }
    +  }
    +
    +  test("read new parquet files in partitioned directories with globbing") {
    +    withTempDirs { case (src, tmp) =>
    +      val partition1SubDir = new File(src, "partition=1")
    +      val partition2SubDir = new File(src, "partition=2")
    +
    +      val schema = new StructType().add("value", 
StringType).add("partition", IntegerType)
    +      val fileStream = createFileStream("parquet", 
s"${src.getCanonicalPath}/*/*", Some(schema))
    +      val filtered = fileStream.filter($"value" contains "keep")
    +
    +      testStream(filtered)(
    +        // Create new partition=1 sub dir and write to it, should read
    +        AddParquetFileData(Seq("drop1", "keep2"), partition1SubDir, tmp),
    +        CheckAnswer(("keep2", 1)),
    +
    +        // Append to same partition=1 sub dir, should read
    +        AddParquetFileData(Seq("keep3"), partition1SubDir, tmp),
    +        CheckAnswer(("keep2", 1), ("keep3", 1)),
    +
    +        // Create new partition sub dir and write to it, should read
    +        AddParquetFileData(Seq("keep4"), partition2SubDir, tmp),
    +        CheckAnswer(("keep2", 1), ("keep3", 1), ("keep4", 2)),
    +
    +        // Append to same partition=2 sub dir, should read
    +        AddParquetFileData(Seq("keep5"), partition2SubDir, tmp),
    +        CheckAnswer(("keep2", 1), ("keep3", 1), ("keep4", 2), ("keep5", 2))
    +      )
    +    }
    +  }
    +
    +  test("read new json files in multi-level partitioned dirs with complex 
globbing") {
    +    withTempDirs { case (src, tmp) =>
    +      val level1Dir = new File(src, "level1=xyz")
    +      val level2Dir = new File(level1Dir, "level2=00")
    +
    +      val schema =
    +        new StructType()
    +          .add("value", StringType)
    +          .add("level2", IntegerType)
    +
    +      // Stream from src/level1=xyz/level2=0*/* , which should not pick up 
data from column level1
    +      val fileStream = createFileStream(
    +        "json", s"${level1Dir.getCanonicalPath}/level2=0*/*", Some(schema))
    +
    +      val filtered = fileStream.filter($"value" contains "keep")
    +      val nullStr = null.asInstanceOf[String]
    +      testStream(filtered)(
    +        // Create new src/level1=xyz/level2=0/ and write to it, should 
read value for level2 data
    +        // but not level1 as it is not in scope of the provided glob 
pattern src/level1=xyz/*/*
    +        AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", 
level2Dir, tmp),
    +        CheckAnswer(("keep2", 0)),
    +
    +        AddTextFileData("{'value': 'keep3'}", level2Dir, tmp),
    +        CheckAnswer(("keep2", 0), ("keep3", 0)),
    +
    +        // Create new dir/subDir/subsubdir and write to it, should read
    --- End diff --
    
    Update comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to