cloud-fan commented on a change in pull request #34575:
URL: https://github.com/apache/spark/pull/34575#discussion_r772214752



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
##########
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.File
+import java.text.SimpleDateFormat
+
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, 
Row}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
+
+class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
+
+  val data0: Seq[Row] = Seq(Row("jack", 24, Row(12345L, "uom")))
+
+  val data1: Seq[Row] = Seq(Row("lily", 31, Row(54321L, "ucb")))
+
+  val schema: StructType = new StructType()
+    .add(StructField("name", StringType))
+    .add(StructField("age", IntegerType))
+    .add(StructField("info", new StructType()
+      .add(StructField("id", LongType))
+      .add(StructField("university", StringType))))
+
+  val schemaWithNameConflicts: StructType = new StructType()
+    .add(StructField("name", StringType))
+    .add(StructField("age", IntegerType))
+    .add(StructField("_METADATA", new StructType()
+      .add(StructField("id", LongType))
+      .add(StructField("university", StringType))))
+
+  private val METADATA_FILE_PATH = "_metadata.file_path"
+
+  private val METADATA_FILE_NAME = "_metadata.file_name"
+
+  private val METADATA_FILE_SIZE = "_metadata.file_size"
+
+  private val METADATA_FILE_MODIFICATION_TIME = 
"_metadata.file_modification_time"
+
+  /**
+   * This test wrapper will test for both row-based and column-based file 
formats:
+   * (json and parquet) with nested schema:
+   * 1. create df0 and df1 and save them as testFileFormat under /data/f0 and 
/data/f1
+   * 2. read the path /data, return the df for further testing
+   * 3. create actual metadata maps for both files under /data/f0 and /data/f1 
for further testing
+   *
+   * The final df will have data:
+   * jack | 24 | {12345, uom}
+   * lily | 31 | {54321, ucb}
+   *
+   * The schema of the df will be the `fileSchema` provided to this method
+   *
+   * This test wrapper will provide a `df` and actual metadata map `f0`, `f1`
+   */
+  private def metadataColumnsTest(
+      testName: String, fileSchema: StructType)
+    (f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = {
+    Seq("json", "parquet").foreach { testFileFormat =>
+      test(s"metadata struct ($testFileFormat): " + testName) {
+        withTempDir { dir =>
+          import scala.collection.JavaConverters._
+
+          // 1. create df0 and df1 and save under /data/f0 and /data/f1
+          val df0 = spark.createDataFrame(data0.asJava, fileSchema)
+          val f0 = new File(dir, "data/f0").getCanonicalPath
+          df0.coalesce(1).write.format(testFileFormat).save(f0)
+
+          val df1 = spark.createDataFrame(data1.asJava, fileSchema)
+          val f1 = new File(dir, "data/f1").getCanonicalPath
+          df1.coalesce(1).write.format(testFileFormat).save(f1)
+
+          // 2. read both f0 and f1
+          val df = spark.read.format(testFileFormat).schema(fileSchema)
+            .load(new File(dir, "data").getCanonicalPath + "/*")
+
+          val realF0 = new File(dir, "data/f0").listFiles()
+            .filter(_.getName.endsWith(s".$testFileFormat")).head
+
+          val realF1 = new File(dir, "data/f1").listFiles()
+            .filter(_.getName.endsWith(s".$testFileFormat")).head
+
+          // 3. create f0 and f1 metadata data
+          val f0Metadata = Map(
+            METADATA_FILE_PATH -> realF0.toURI.toString,
+            METADATA_FILE_NAME -> realF0.getName,
+            METADATA_FILE_SIZE -> realF0.length(),
+            METADATA_FILE_MODIFICATION_TIME -> realF0.lastModified()
+          )
+          val f1Metadata = Map(
+            METADATA_FILE_PATH -> realF1.toURI.toString,
+            METADATA_FILE_NAME -> realF1.getName,
+            METADATA_FILE_SIZE -> realF1.length(),
+            METADATA_FILE_MODIFICATION_TIME -> realF1.lastModified()
+          )
+
+          f(df, f0Metadata, f1Metadata)
+        }
+      }
+    }
+  }
+
+  metadataColumnsTest("read partial/all metadata struct fields", schema) { 
(df, f0, f1) =>
+    // read all available metadata struct fields
+    checkAnswer(
+      df.select("name", "age", "info",
+        METADATA_FILE_NAME, METADATA_FILE_PATH,
+        METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"),
+          f0(METADATA_FILE_NAME), f0(METADATA_FILE_PATH),
+          f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME)),
+        Row("lily", 31, Row(54321L, "ucb"),
+          f1(METADATA_FILE_NAME), f1(METADATA_FILE_PATH),
+          f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME))
+      )
+    )
+
+    // read a part of metadata struct fields
+    checkAnswer(
+      df.select("name", "info.university", METADATA_FILE_NAME, 
METADATA_FILE_SIZE),
+      Seq(
+        Row("jack", "uom", f0(METADATA_FILE_NAME), f0(METADATA_FILE_SIZE)),
+        Row("lily", "ucb", f1(METADATA_FILE_NAME), f1(METADATA_FILE_SIZE))
+      )
+    )
+  }
+
+  metadataColumnsTest("read metadata struct fields with random ordering", 
schema) { (df, f0, f1) =>
+    // read a part of metadata struct fields with random ordering
+    checkAnswer(
+      df.select(METADATA_FILE_NAME, "name", METADATA_FILE_SIZE, 
"info.university"),
+      Seq(
+        Row(f0(METADATA_FILE_NAME), "jack", f0(METADATA_FILE_SIZE), "uom"),
+        Row(f1(METADATA_FILE_NAME), "lily", f1(METADATA_FILE_SIZE), "ucb")
+      )
+    )
+  }
+
+  metadataColumnsTest("read metadata struct fields with expressions", schema) 
{ (df, f0, f1) =>
+    checkAnswer(
+      df.select(
+        // substring of file name
+        substring(col(METADATA_FILE_NAME), 1, 3),
+        // convert timestamp in millis to unixtime and to date format
+        from_unixtime(col(METADATA_FILE_MODIFICATION_TIME).divide(lit(1000)), 
"yyyy-MM")
+          .as("_file_modification_date"),
+        // convert to kb
+        col(METADATA_FILE_SIZE).divide(lit(1024)).as("_file_size_kb"),
+        // get the file format
+        substring_index(col(METADATA_FILE_PATH), ".", -1).as("_file_format")
+      ),
+      Seq(
+        Row(
+          f0(METADATA_FILE_NAME).toString.substring(0, 3), // sql substring vs 
scala substring
+          new 
SimpleDateFormat("yyyy-MM").format(f0(METADATA_FILE_MODIFICATION_TIME)),
+          f0(METADATA_FILE_SIZE).asInstanceOf[Long] / 1024.toDouble,
+          f0(METADATA_FILE_PATH).toString.split("\\.").takeRight(1).head
+        ),
+        Row(
+          f1(METADATA_FILE_NAME).toString.substring(0, 3), // sql substring vs 
scala substring
+          new 
SimpleDateFormat("yyyy-MM").format(f1(METADATA_FILE_MODIFICATION_TIME)),
+          f1(METADATA_FILE_SIZE).asInstanceOf[Long] / 1024.toDouble,
+          f1(METADATA_FILE_PATH).toString.split("\\.").takeRight(1).head
+        )
+      )
+    )
+  }
+
+  metadataColumnsTest("select all will not select metadata struct fields", 
schema) { (df, _, _) =>
+    checkAnswer(
+      df.select("*"),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom")),
+        Row("lily", 31, Row(54321L, "ucb"))
+      )
+    )
+  }
+
+  metadataColumnsTest("metadata will not overwrite user data",
+    schemaWithNameConflicts) { (df, _, _) =>
+    // the user data has the schema: name, age, _metadata.id, 
_metadata.university
+
+    // select user data
+    checkAnswer(
+      df.select("name", "age", "_METADATA", "_metadata"),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), Row(12345L, "uom")),
+        Row("lily", 31, Row(54321L, "ucb"), Row(54321L, "ucb"))
+      )
+    )
+
+    // select metadata will fail when analysis
+    val ex = intercept[AnalysisException] {
+      df.select("name", METADATA_FILE_NAME).collect()
+    }
+    assert(ex.getMessage.contains("No such struct field file_name in id, 
university"))
+  }
+
+  metadataColumnsTest("select only metadata", schema) { (df, f0, f1) =>
+    checkAnswer(
+      df.select(METADATA_FILE_NAME, METADATA_FILE_PATH,
+        METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME),
+      Seq(
+        Row(f0(METADATA_FILE_NAME), f0(METADATA_FILE_PATH),
+          f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME)),
+        Row(f1(METADATA_FILE_NAME), f1(METADATA_FILE_PATH),
+          f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME))
+      )
+    )
+    checkAnswer(
+      df.select("_metadata"),
+      Seq(
+        Row(Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME),
+          f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))),
+        Row(Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME),
+          f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
+      )
+    )
+  }
+
+  metadataColumnsTest("select and re-select", schema) { (df, f0, f1) =>
+    checkAnswer(
+      df.select("name", "age", "info",
+        METADATA_FILE_NAME, METADATA_FILE_PATH,
+        METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME)
+        .select("name", "file_path"), // cast _metadata.file_path as file_path
+      Seq(
+        Row("jack", f0(METADATA_FILE_PATH)),
+        Row("lily", f1(METADATA_FILE_PATH))
+      )
+    )
+  }
+
+  metadataColumnsTest("alias", schema) { (df, f0, f1) =>
+
+    val aliasDF = df.select(
+      Column("name").as("myName"),
+      Column("age").as("myAge"),
+      Column(METADATA_FILE_NAME).as("myFileName"),
+      Column(METADATA_FILE_SIZE).as("myFileSize")
+    )
+
+    // check schema
+    val expectedSchema = new StructType()
+      .add(StructField("myName", StringType))
+      .add(StructField("myAge", IntegerType))
+      .add(StructField("myFileName", StringType))
+      .add(StructField("myFileSize", LongType))
+
+    assert(aliasDF.schema.fields.toSet == expectedSchema.fields.toSet)
+
+    // check data
+    checkAnswer(
+      aliasDF,
+      Seq(
+        Row("jack", 24, f0(METADATA_FILE_NAME), f0(METADATA_FILE_SIZE)),
+        Row("lily", 31, f1(METADATA_FILE_NAME), f1(METADATA_FILE_SIZE))
+      )
+    )
+  }
+
+  metadataColumnsTest("filter", schema) { (df, f0, _) =>
+    checkAnswer(
+      df.select("name", "age", METADATA_FILE_NAME)
+        .where(Column(METADATA_FILE_NAME) === f0(METADATA_FILE_NAME)),
+      Seq(
+        // _file_name == f0's name, so we will only have 1 row
+        Row("jack", 24, f0(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  Seq(true, false).foreach { caseSensitive =>
+    metadataColumnsTest(s"upper/lower case when case " +
+      s"sensitive is $caseSensitive", schemaWithNameConflicts) { (df, f0, f1) 
=>
+      withSQLConf("spark.sql.caseSensitive" -> caseSensitive.toString) {

Review comment:
       nit: use `SQLConf.CASE_SENSITIVE.key`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to