Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15360#discussion_r83369611
  
    --- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
    @@ -358,50 +358,180 @@ class StatisticsSuite extends QueryTest with 
TestHiveSingleton with SQLTestUtils
         }
       }
     
    -  test("generate column-level statistics and load them from hive 
metastore") {
    +  test("test refreshing table stats of cached data source table by 
`ANALYZE TABLE` statement") {
    +    val tableName = "tbl"
    +    withTable(tableName) {
    +      val tableIndent = TableIdentifier(tableName, Some("default"))
    +      val catalog = 
spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
    +      sql(s"CREATE TABLE $tableName (key int) USING PARQUET")
    +      sql(s"INSERT INTO $tableName SELECT 1")
    +      sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
    +      // Table lookup will make the table cached.
    +      catalog.lookupRelation(tableIndent)
    +      val stats1 = 
catalog.getCachedDataSourceTable(tableIndent).asInstanceOf[LogicalRelation]
    +        .catalogTable.get.stats.get
    +      assert(stats1.sizeInBytes > 0)
    +      assert(stats1.rowCount.contains(1))
    +
    +      sql(s"INSERT INTO $tableName SELECT 2")
    +      sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
    +      catalog.lookupRelation(tableIndent)
    +      val stats2 = 
catalog.getCachedDataSourceTable(tableIndent).asInstanceOf[LogicalRelation]
    +        .catalogTable.get.stats.get
    +      assert(stats2.sizeInBytes > stats1.sizeInBytes)
    +      assert(stats2.rowCount.contains(2))
    +    }
    +  }
    +
    +  private def dataAndColStats(): (DataFrame, Seq[(StructField, 
ColumnStat)]) = {
         import testImplicits._
     
         val intSeq = Seq(1, 2)
         val stringSeq = Seq("a", "bb")
    +    val binarySeq = Seq("a", "bb").map(_.getBytes)
         val booleanSeq = Seq(true, false)
    -
         val data = intSeq.indices.map { i =>
    -      (intSeq(i), stringSeq(i), booleanSeq(i))
    +      (intSeq(i), stringSeq(i), binarySeq(i), booleanSeq(i))
         }
    -    val tableName = "table"
    -    withTable(tableName) {
    -      val df = data.toDF("c1", "c2", "c3")
    -      df.write.format("parquet").saveAsTable(tableName)
    -      val expectedColStatsSeq = df.schema.map { f =>
    -        val colStat = f.dataType match {
    -          case IntegerType =>
    -            ColumnStat(InternalRow(0L, intSeq.max, intSeq.min, 
intSeq.distinct.length.toLong))
    -          case StringType =>
    -            ColumnStat(InternalRow(0L, stringSeq.map(_.length).sum / 
stringSeq.length.toDouble,
    -              stringSeq.map(_.length).max.toLong, 
stringSeq.distinct.length.toLong))
    -          case BooleanType =>
    -            ColumnStat(InternalRow(0L, 
booleanSeq.count(_.equals(true)).toLong,
    -              booleanSeq.count(_.equals(false)).toLong))
    -        }
    -        (f, colStat)
    +    val df = data.toDF("c1", "c2", "c3", "c4")
    +    val expectedColStatsSeq = df.schema.map { f =>
    +      val colStat = f.dataType match {
    +        case IntegerType =>
    +          ColumnStat(InternalRow(0L, intSeq.max, intSeq.min, 
intSeq.distinct.length.toLong))
    +        case StringType =>
    +          ColumnStat(InternalRow(0L, stringSeq.map(_.length).sum / 
stringSeq.length.toDouble,
    +            stringSeq.map(_.length).max.toLong, 
stringSeq.distinct.length.toLong))
    +        case BinaryType =>
    +          ColumnStat(InternalRow(0L, binarySeq.map(_.length).sum / 
binarySeq.length.toDouble,
    +            binarySeq.map(_.length).max.toLong))
    +        case BooleanType =>
    +          ColumnStat(InternalRow(0L, 
booleanSeq.count(_.equals(true)).toLong,
    +            booleanSeq.count(_.equals(false)).toLong))
           }
    +      (f, colStat)
    +    }
    +    (df, expectedColStatsSeq)
    +  }
    +
    +  private def checkColStats(
    +      tableName: String,
    +      isDataSourceTable: Boolean,
    +      expectedColStatsSeq: Seq[(StructField, ColumnStat)]): Unit = {
    +    val readback = spark.table(tableName)
    +    val stats = readback.queryExecution.analyzed.collect {
    +      case rel: MetastoreRelation =>
    +        assert(!isDataSourceTable, "Expected a Hive serde table, but got a 
data source table")
    +        rel.catalogTable.stats.get
    +      case rel: LogicalRelation =>
    +        assert(isDataSourceTable, "Expected a data source table, but got a 
Hive serde table")
    +        rel.catalogTable.get.stats.get
    +    }
    +    assert(stats.length == 1)
    +    val columnStats = stats.head.colStats
    +    assert(columnStats.size == expectedColStatsSeq.length)
    +    expectedColStatsSeq.foreach { case (field, expectedColStat) =>
    +      StatisticsTest.checkColStat(
    +        dataType = field.dataType,
    +        colStat = columnStats(field.name),
    +        expectedColStat = expectedColStat,
    +        rsd = spark.sessionState.conf.ndvMaxError)
    +    }
    +  }
     
    -      sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1, 
c2, c3")
    -      val readback = spark.table(tableName)
    -      val relations = readback.queryExecution.analyzed.collect { case rel: 
LogicalRelation =>
    -        val columnStats = rel.catalogTable.get.stats.get.colStats
    -        expectedColStatsSeq.foreach { case (field, expectedColStat) =>
    -          assert(columnStats.contains(field.name))
    -          val colStat = columnStats(field.name)
    +  test("generate and load column-level stats for data source table") {
    +    val dsTable = "dsTable"
    +    withTable(dsTable) {
    +      val (df, expectedColStatsSeq) = dataAndColStats()
    +      df.write.format("parquet").saveAsTable(dsTable)
    +      sql(s"ANALYZE TABLE $dsTable COMPUTE STATISTICS FOR COLUMNS c1, c2, 
c3, c4")
    +      checkColStats(dsTable, isDataSourceTable = true, expectedColStatsSeq)
    +    }
    +  }
    +
    +  test("generate and load column-level stats for hive serde table") {
    +    val hTable = "hTable"
    +    val tmp = "tmp"
    +    withTable(hTable, tmp) {
    +      val (df, expectedColStatsSeq) = dataAndColStats()
    +      df.write.format("parquet").saveAsTable(tmp)
    +      sql(s"CREATE TABLE $hTable (c1 int, c2 string, c3 binary, c4 
boolean) STORED AS TEXTFILE")
    +      sql(s"INSERT INTO $hTable SELECT * FROM $tmp")
    +      sql(s"ANALYZE TABLE $hTable COMPUTE STATISTICS FOR COLUMNS c1, c2, 
c3, c4")
    +      checkColStats(hTable, isDataSourceTable = false, expectedColStatsSeq)
    +    }
    +  }
    +
    +  private def checkCaseSensitiveColStats(columnName: String): Unit = {
    +    val tableName = "tbl"
    +    withTable(tableName) {
    +      val column1 = columnName.toLowerCase
    +      val column2 = columnName.toUpperCase
    +      withSQLConf("spark.sql.caseSensitive" -> "true") {
    +        sql(s"CREATE TABLE $tableName (`$column1` int, `$column2` double) 
USING PARQUET")
    --- End diff --
    
    It outputs a warning including an exception, but the test can complete 
successfully.
    ```
    WARN org.apache.spark.sql.hive.HiveExternalCatalog: Could not persist 
`default`.`tbl` in a Hive compatible way. Persisting it into Hive metastore in 
Spark SQL specific format.
    org.apache.hadoop.hive.ql.metadata.HiveException: 
org.apache.hadoop.hive.ql.metadata.HiveException: Duplicate column name c1 in 
the table definition.
        at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:720)
    ...
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to