HeartSaVioR commented on code in PR #52375:
URL: https://github.com/apache/spark/pull/52375#discussion_r2357545319


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala:
##########
@@ -1133,4 +1133,98 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
       assert(selectSingleRowDf.count() === 1)
     }
   }
+
+  Seq("true", "false").foreach { sideCharPadding =>
+    test(s"SPARK-53625: file metadata in streaming with char type, " +
+      s"sideCharPadding=$sideCharPadding") {
+      withSQLConf(SQLConf.READ_SIDE_CHAR_PADDING.key -> sideCharPadding) {
+        withTempDir { dir =>
+          import scala.jdk.CollectionConverters._
+
+          val metadata = new MetadataBuilder()
+            .putString("__CHAR_VARCHAR_TYPE_STRING", "char(1)")
+            .build()
+          val charSchemaStruct = new StructType()
+            .add(StructField("char_col", StringType, metadata = metadata))
+
+          val data: Seq[Row] = Seq(Row("A"), Row("B"))
+          val df = spark.createDataFrame(data.asJava, charSchemaStruct)
+          df.coalesce(1).write.format("json")
+            .save(dir.getCanonicalPath + "/source/new-streaming-data")
+
+          val streamDf = spark.readStream.format("json")
+            .schema(charSchemaStruct)
+            .load(dir.getCanonicalPath + "/source/new-streaming-data")
+            .select("*", "_metadata")
+
+          val streamQuery0 = streamDf
+            .writeStream.format("json")
+            .option("checkpointLocation", dir.getCanonicalPath + 
"/target/checkpoint")
+            .trigger(Trigger.AvailableNow())
+            .start(dir.getCanonicalPath + "/target/new-streaming-data")
+
+          streamQuery0.awaitTermination()
+          assert(streamQuery0.lastProgress.numInputRows == 2L)
+
+          val newDF = spark.read.format("json")
+            .load(dir.getCanonicalPath + "/target/new-streaming-data")
+
+          val sourceFile = new File(dir, 
"/source/new-streaming-data").listFiles()
+            .filter(_.getName.endsWith(".json")).head
+          val sourceFileMetadata = Map(
+            METADATA_FILE_PATH -> sourceFile.toURI.toString,
+            METADATA_FILE_NAME -> sourceFile.getName,
+            METADATA_FILE_SIZE -> sourceFile.length(),
+            METADATA_FILE_BLOCK_START -> 0,
+            METADATA_FILE_BLOCK_LENGTH -> sourceFile.length(),
+            METADATA_FILE_MODIFICATION_TIME -> new 
Timestamp(sourceFile.lastModified())
+          )
+
+          // SELECT * will have: char_col, _metadata of 
/source/new-streaming-data
+          assert(newDF.select("*").columns.toSet == Set("char_col", 
"_metadata"))
+          // Verify the data is expected
+          checkAnswer(
+            newDF.select(col("char_col"),
+              col(METADATA_FILE_PATH), col(METADATA_FILE_NAME),
+              col(METADATA_FILE_SIZE), col(METADATA_FILE_BLOCK_START),
+              col(METADATA_FILE_BLOCK_LENGTH),
+              // since we are writing _metadata to a json file,
+              // we should explicitly cast the column to timestamp type
+              to_timestamp(col(METADATA_FILE_MODIFICATION_TIME))),
+            Seq(
+              Row(
+                "A",
+                sourceFileMetadata(METADATA_FILE_PATH),
+                sourceFileMetadata(METADATA_FILE_NAME),
+                sourceFileMetadata(METADATA_FILE_SIZE),
+                sourceFileMetadata(METADATA_FILE_BLOCK_START),
+                sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH),
+                sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)),
+              Row(
+                "B",
+                sourceFileMetadata(METADATA_FILE_PATH),
+                sourceFileMetadata(METADATA_FILE_NAME),
+                sourceFileMetadata(METADATA_FILE_SIZE),
+                sourceFileMetadata(METADATA_FILE_BLOCK_START),
+                sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH),
+                sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME))
+            )
+          )
+
+          checkAnswer(
+            newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_SIZE),

Review Comment:
   `.where(s"$METADATA_FILE_SIZE > 0")`
   
   I guess this is just a sanity check, right? Is this ever possible where a 
row is mapped to some file while the file has size 0?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -6322,6 +6322,14 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val STREAMING_PROJECT_METADATA_COLS_ENABLED =

Review Comment:
   This is too general and could mislead the impact of the config. We'd need to 
mention DSv1 and getBatch (or microbatch plan for the source) in the config 
name to scope it correctly.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala:
##########
@@ -1133,4 +1133,98 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
       assert(selectSingleRowDf.count() === 1)
     }
   }
+
+  Seq("true", "false").foreach { sideCharPadding =>
+    test(s"SPARK-53625: file metadata in streaming with char type, " +
+      s"sideCharPadding=$sideCharPadding") {
+      withSQLConf(SQLConf.READ_SIDE_CHAR_PADDING.key -> sideCharPadding) {
+        withTempDir { dir =>
+          import scala.jdk.CollectionConverters._
+
+          val metadata = new MetadataBuilder()
+            .putString("__CHAR_VARCHAR_TYPE_STRING", "char(1)")
+            .build()
+          val charSchemaStruct = new StructType()
+            .add(StructField("char_col", StringType, metadata = metadata))
+
+          val data: Seq[Row] = Seq(Row("A"), Row("B"))

Review Comment:
   nit: is explicit typing necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to