HeartSaVioR commented on a change in pull request #35676:
URL: https://github.com/apache/spark/pull/35676#discussion_r818270346



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
##########
@@ -510,4 +510,58 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
       )
     }
   }
+
+  metadataColumnsTest("file metadata in streaming", schema) { (df, _, _) =>
+    withTempDir { dir =>
+      df.coalesce(1).write.format("json").save(dir.getCanonicalPath + 
"/source/new-streaming-data")
+
+      val stream = spark.readStream.format("json")
+        .schema(schema)
+        .load(dir.getCanonicalPath + "/source/new-streaming-data")
+        .select("*", "_metadata")
+        .writeStream.format("json")
+        .option("checkpointLocation", dir.getCanonicalPath + 
"/target/checkpoint")
+        .start(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      stream.processAllAvailable()
+      stream.stop()
+
+      val newDF = spark.read.format("json")
+        .load(dir.getCanonicalPath + "/target/new-streaming-data")
+
+      val sourceFile = new File(dir, "/source/new-streaming-data").listFiles()
+        .filter(_.getName.endsWith(".json")).head
+      val sourceFileMetadata = Map(
+        METADATA_FILE_PATH -> sourceFile.toURI.toString,
+        METADATA_FILE_NAME -> sourceFile.getName,
+        METADATA_FILE_SIZE -> sourceFile.length(),
+        METADATA_FILE_MODIFICATION_TIME -> new 
Timestamp(sourceFile.lastModified())
+      )
+
+      // SELECT * will have: name, age, info, _metadata of 
/source/new-streaming-data
+      assert(newDF.select("*").columns.toSet == Set("name", "age", "info", 
"_metadata"))

Review comment:
       @Yaohua628
   
   Sorry for the post review. I haven't had a time to review this in time.
   
   Just to make clear, select("*").show() should not expose a hidden column, 
right? Since you've included "_metadata" from the list of columns so I would 
like to double confirm that it is not user facing.
   
   And given we include the new column, `dropDuplicate` without explicitly 
mentioning columns in streaming query would be broken. state schema would 
somehow include the hidden column in the schema, whereas state schema from 
older version of checkpoint does not include the hidden column (as they didn't 
exist).
   
   We should test it, and if it fall into the case, we should mention this in 
the migration guide, or make this configurable and by default turn off. (We did 
this for adding Kafka header - #22282)
   
   cc. @cloud-fan 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to