HeartSaVioR commented on a change in pull request #35676:
URL: https://github.com/apache/spark/pull/35676#discussion_r818270346
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
##########
@@ -510,4 +510,58 @@ class FileMetadataStructSuite extends QueryTest with
SharedSparkSession {
)
}
}
+
+ metadataColumnsTest("file metadata in streaming", schema) { (df, _, _) =>
+ withTempDir { dir =>
+ df.coalesce(1).write.format("json").save(dir.getCanonicalPath +
"/source/new-streaming-data")
+
+ val stream = spark.readStream.format("json")
+ .schema(schema)
+ .load(dir.getCanonicalPath + "/source/new-streaming-data")
+ .select("*", "_metadata")
+ .writeStream.format("json")
+ .option("checkpointLocation", dir.getCanonicalPath +
"/target/checkpoint")
+ .start(dir.getCanonicalPath + "/target/new-streaming-data")
+
+ stream.processAllAvailable()
+ stream.stop()
+
+ val newDF = spark.read.format("json")
+ .load(dir.getCanonicalPath + "/target/new-streaming-data")
+
+ val sourceFile = new File(dir, "/source/new-streaming-data").listFiles()
+ .filter(_.getName.endsWith(".json")).head
+ val sourceFileMetadata = Map(
+ METADATA_FILE_PATH -> sourceFile.toURI.toString,
+ METADATA_FILE_NAME -> sourceFile.getName,
+ METADATA_FILE_SIZE -> sourceFile.length(),
+ METADATA_FILE_MODIFICATION_TIME -> new
Timestamp(sourceFile.lastModified())
+ )
+
+ // SELECT * will have: name, age, info, _metadata of
/source/new-streaming-data
+ assert(newDF.select("*").columns.toSet == Set("name", "age", "info",
"_metadata"))
Review comment:
@Yaohua628
Sorry for the post review. I haven't had a time to review this in time.
Just to make clear, select("*").show() should not expose a hidden column,
right? Since you've included "_metadata" from the list of columns so I would
like to double confirm that it is not user facing.
And given we include the new column, `dropDuplicate` without explicitly
mentioning columns in streaming query would be broken. state schema would
somehow include the hidden column in the schema, whereas state schema from
older version of checkpoint does not include the hidden column (as they didn't
exist).
We should test it, and if it fall into the case, we should mention this in
the migration guide, or make this configurable and by default turn off. (We did
this for adding Kafka header - #22282)
cc. @cloud-fan
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]