HeartSaVioR commented on code in PR #52375: URL: https://github.com/apache/spark/pull/52375#discussion_r2357545319
########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala: ########## @@ -1133,4 +1133,98 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { assert(selectSingleRowDf.count() === 1) } } + + Seq("true", "false").foreach { sideCharPadding => + test(s"SPARK-53625: file metadata in streaming with char type, " + + s"sideCharPadding=$sideCharPadding") { + withSQLConf(SQLConf.READ_SIDE_CHAR_PADDING.key -> sideCharPadding) { + withTempDir { dir => + import scala.jdk.CollectionConverters._ + + val metadata = new MetadataBuilder() + .putString("__CHAR_VARCHAR_TYPE_STRING", "char(1)") + .build() + val charSchemaStruct = new StructType() + .add(StructField("char_col", StringType, metadata = metadata)) + + val data: Seq[Row] = Seq(Row("A"), Row("B")) + val df = spark.createDataFrame(data.asJava, charSchemaStruct) + df.coalesce(1).write.format("json") + .save(dir.getCanonicalPath + "/source/new-streaming-data") + + val streamDf = spark.readStream.format("json") + .schema(charSchemaStruct) + .load(dir.getCanonicalPath + "/source/new-streaming-data") + .select("*", "_metadata") + + val streamQuery0 = streamDf + .writeStream.format("json") + .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint") + .trigger(Trigger.AvailableNow()) + .start(dir.getCanonicalPath + "/target/new-streaming-data") + + streamQuery0.awaitTermination() + assert(streamQuery0.lastProgress.numInputRows == 2L) + + val newDF = spark.read.format("json") + .load(dir.getCanonicalPath + "/target/new-streaming-data") + + val sourceFile = new File(dir, "/source/new-streaming-data").listFiles() + .filter(_.getName.endsWith(".json")).head + val sourceFileMetadata = Map( + METADATA_FILE_PATH -> sourceFile.toURI.toString, + METADATA_FILE_NAME -> sourceFile.getName, + METADATA_FILE_SIZE -> sourceFile.length(), + METADATA_FILE_BLOCK_START -> 0, + METADATA_FILE_BLOCK_LENGTH -> sourceFile.length(), + METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified()) + ) + + // SELECT * will have: char_col, _metadata of /source/new-streaming-data + assert(newDF.select("*").columns.toSet == Set("char_col", "_metadata")) + // Verify the data is expected + checkAnswer( + newDF.select(col("char_col"), + col(METADATA_FILE_PATH), col(METADATA_FILE_NAME), + col(METADATA_FILE_SIZE), col(METADATA_FILE_BLOCK_START), + col(METADATA_FILE_BLOCK_LENGTH), + // since we are writing _metadata to a json file, + // we should explicitly cast the column to timestamp type + to_timestamp(col(METADATA_FILE_MODIFICATION_TIME))), + Seq( + Row( + "A", + sourceFileMetadata(METADATA_FILE_PATH), + sourceFileMetadata(METADATA_FILE_NAME), + sourceFileMetadata(METADATA_FILE_SIZE), + sourceFileMetadata(METADATA_FILE_BLOCK_START), + sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH), + sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)), + Row( + "B", + sourceFileMetadata(METADATA_FILE_PATH), + sourceFileMetadata(METADATA_FILE_NAME), + sourceFileMetadata(METADATA_FILE_SIZE), + sourceFileMetadata(METADATA_FILE_BLOCK_START), + sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH), + sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)) + ) + ) + + checkAnswer( + newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_SIZE), Review Comment: `.where(s"$METADATA_FILE_SIZE > 0")` I guess this is just a sanity check, right? Is this ever possible where a row is mapped to some file while the file has size 0? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -6322,6 +6322,14 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STREAMING_PROJECT_METADATA_COLS_ENABLED = Review Comment: This is too general and could mislead the impact of the config. We'd need to mention DSv1 and getBatch (or microbatch plan for the source) in the config name to scope it correctly. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala: ########## @@ -1133,4 +1133,98 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { assert(selectSingleRowDf.count() === 1) } } + + Seq("true", "false").foreach { sideCharPadding => + test(s"SPARK-53625: file metadata in streaming with char type, " + + s"sideCharPadding=$sideCharPadding") { + withSQLConf(SQLConf.READ_SIDE_CHAR_PADDING.key -> sideCharPadding) { + withTempDir { dir => + import scala.jdk.CollectionConverters._ + + val metadata = new MetadataBuilder() + .putString("__CHAR_VARCHAR_TYPE_STRING", "char(1)") + .build() + val charSchemaStruct = new StructType() + .add(StructField("char_col", StringType, metadata = metadata)) + + val data: Seq[Row] = Seq(Row("A"), Row("B")) Review Comment: nit: is explicit typing necessary? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org