Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21893#discussion_r206188190
  
    --- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/MultiFormatTableSuite.scala
 ---
    @@ -0,0 +1,514 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.hive.execution
    +
    +import java.io.File
    +import java.net.URI
    +
    +import org.scalatest.BeforeAndAfterEach
    +import org.scalatest.Matchers
    +
    +import org.apache.spark.sql.{DataFrame, QueryTest, Row}
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.catalog.CatalogTableType
    +import org.apache.spark.sql.execution._
    +import org.apache.spark.sql.execution.command.DDLUtils
    +import org.apache.spark.sql.hive.test.TestHiveSingleton
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.test.SQLTestUtils
    +
    +class MultiFormatTableSuite
    +  extends QueryTest with SQLTestUtils with TestHiveSingleton with 
BeforeAndAfterEach with Matchers {
    +  import testImplicits._
    +
    +  val parser = new SparkSqlParser(new SQLConf())
    +
    +  override def afterEach(): Unit = {
    +    try {
    +      // drop all databases, tables and functions after each test
    +      spark.sessionState.catalog.reset()
    +    } finally {
    +      super.afterEach()
    +    }
    +  }
    +
    +  val partitionCol = "dt"
    +  val partitionVal1 = "2018-01-26"
    +  val partitionVal2 = "2018-01-27"
    +
    +  private case class PartitionDefinition(
    +      column: String,
    +      value: String,
    +      location: URI,
    +      format: Option[String] = None
    +  ) {
    +
    +    def toSpec: String = {
    +      s"($column='$value')"
    +    }
    +    def toSpecAsMap: Map[String, String] = {
    +      Map(column -> value)
    +    }
    +  }
    +
    +  test("create hive table with multi format partitions") {
    +    val catalog = spark.sessionState.catalog
    +    withTempDir { baseDir =>
    +
    +      val partitionedTable = "ext_multiformat_partition_table"
    +      withTable(partitionedTable) {
    +        assert(baseDir.listFiles.isEmpty)
    +
    +        val partitions = createMultiformatPartitionDefinitions(baseDir)
    +
    +        createTableWithPartitions(partitionedTable, baseDir, partitions)
    +
    +        // Check table storage type is PARQUET
    +        val hiveResultTable =
    +          catalog.getTableMetadata(TableIdentifier(partitionedTable, 
Some("default")))
    +        assert(DDLUtils.isHiveTable(hiveResultTable))
    +        assert(hiveResultTable.tableType == CatalogTableType.EXTERNAL)
    +        assert(hiveResultTable.storage.inputFormat
    +          
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
    +        )
    +        assert(hiveResultTable.storage.outputFormat
    +          
.contains("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
    +        )
    +        assert(hiveResultTable.storage.serde
    +          
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
    +        )
    +
    +        // Check table has correct partititons
    +        assert(
    +          catalog.listPartitions(TableIdentifier(partitionedTable,
    +            Some("default"))).map(_.spec).toSet == 
partitions.map(_.toSpecAsMap).toSet
    +        )
    +
    +        // Check first table partition storage type is PARQUET
    +        val parquetPartition = catalog.getPartition(
    +          TableIdentifier(partitionedTable, Some("default")),
    +          partitions.head.toSpecAsMap
    +        )
    +        assert(
    +          parquetPartition.storage.serde
    +            
.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
    +        )
    +
    +        // Check second table partition storage type is AVRO
    +        val avroPartition = catalog.getPartition(
    +          TableIdentifier(partitionedTable, Some("default")),
    +          partitions.last.toSpecAsMap
    +        )
    +        assert(
    +          
avroPartition.storage.serde.contains("org.apache.hadoop.hive.serde2.avro.AvroSerDe")
    +        )
    +
    +        assert(
    +          avroPartition.storage.inputFormat
    +            
.contains("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat")
    +        )
    +      }
    +    }
    +  }
    +
    +  test("create hive table with only parquet partitions - test plan") {
    +    withTempDir { baseDir =>
    +      val partitionedTable = "ext_parquet_partition_table"
    +
    +      val partitions = createParquetPartitionDefinitions(baseDir)
    +
    +      withTable(partitionedTable) {
    +        assert(baseDir.listFiles.isEmpty)
    +
    +        createTableWithPartitions(partitionedTable, baseDir, partitions)
    +
    +        val selectQuery =
    +          s"""
    +             |SELECT key, value FROM ${partitionedTable}
    +           """.stripMargin
    +
    +        val plan = parser.parsePlan(selectQuery)
    +
    +        
plan.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]) 
shouldNot equal(None)
    +
    +      }
    +    }
    +  }
    +
    +  test("create hive table with only avro partitions - test plan") {
    +    withTempDir { baseDir =>
    +      val partitionedTable = "ext_avro_partition_table"
    +
    +      val partitions = createAvroPartitionDefinitions(baseDir)
    +
    +      withTable(partitionedTable) {
    +        assert(baseDir.listFiles.isEmpty)
    +
    +        createTableWithPartitions(partitionedTable, baseDir, partitions, 
avro = true)
    +
    +        val selectQuery =
    +          s"""
    +             |SELECT key, value FROM ${partitionedTable}
    +           """.stripMargin
    +
    +        val plan = parser.parsePlan(selectQuery)
    +
    +        
plan.queryExecution.sparkPlan.find(_.isInstanceOf[HiveTableScanExec]) shouldNot 
equal(None)
    +
    +      }
    +    }
    +  }
    +
    +  test("create hive avro table with multi format partitions containing 
correct data") {
    +    withTempDir { baseDir =>
    +      val partitionedTable = "ext_multiformat_partition_table_with_data"
    +      val avroPartitionTable = "ext_avro_partition_table"
    +      val pqPartitionTable = "ext_pq_partition_table"
    +
    +      val partitions = createMultiformatPartitionDefinitions(baseDir)
    +
    +      withTable(partitionedTable, avroPartitionTable, pqPartitionTable) {
    +        assert(baseDir.listFiles.isEmpty)
    +
    +        createTableWithPartitions(partitionedTable, baseDir, partitions, 
true)
    +        createAvroCheckTable(avroPartitionTable, partitions.last)
    +        createPqCheckTable(pqPartitionTable, partitions.head)
    +
    +        // INSERT OVERWRITE TABLE only works for the default table format.
    +        // So we can use it here to insert data into the parquet partition
    +        sql(
    +          s"""
    +             |INSERT OVERWRITE TABLE $pqPartitionTable
    +             |SELECT 1 as id, 'a' as value
    +                  """.stripMargin)
    +
    +        val parquetData = 
spark.read.parquet(partitions.head.location.toString)
    +        checkAnswer(parquetData, Row(1, "a"))
    +
    +        sql(
    +          s"""
    +             |INSERT OVERWRITE TABLE $avroPartitionTable
    +             |SELECT 2, 'b'
    +           """.stripMargin
    +        )
    +
    +        // Directly reading from the avro table should yield correct 
results
    +        val avroData = spark.read.table(avroPartitionTable)
    +        checkAnswer(avroData, Row(2, "b"))
    +
    +        val parquetPartitionSelectQuery =
    +          s"""
    +             |SELECT key, value FROM ${partitionedTable}
    +             |WHERE ${partitionCol}='${partitionVal1}'
    +           """.stripMargin
    +
    +        val avroPartitionSelectQuery =
    +          s"""
    +             |SELECT key, value FROM ${partitionedTable}
    +             |WHERE ${partitionCol}='${partitionVal2}'
    +           """.stripMargin
    +
    +        val selectQuery =
    +          s"""
    +             |SELECT key, value FROM ${partitionedTable}
    +           """.stripMargin
    +
    +        val avroPartitionData = sql(avroPartitionSelectQuery)
    +        checkAnswer(avroPartitionData, Row(2, "b"))
    +
    +        val parquetPartitionData = sql(parquetPartitionSelectQuery)
    +        checkAnswer(parquetPartitionData, Row(1, "a"))
    +
    +        val allData = sql(selectQuery)
    +        checkAnswer(allData, Seq(Row(1, "a"), Row(2, "b")))
    +
    +      }
    +    }
    +  }
    +
    +  test("create hive table with multi format partitions - test plan") {
    +    withTempDir { baseDir =>
    +      val partitionedTable = "ext_multiformat_partition_table"
    +
    +      val partitions = createMultiformatPartitionDefinitions(baseDir)
    +
    +      withTable(partitionedTable) {
    +        assert(baseDir.listFiles.isEmpty)
    +
    +        createTableWithPartitions(partitionedTable, baseDir, partitions)
    +
    +        val selectQuery =
    +          s"""
    +             |SELECT key, value FROM ${partitionedTable}
    +           """.stripMargin
    +
    +        val plan = parser.parsePlan(selectQuery)
    +
    +        
plan.queryExecution.sparkPlan.find(_.isInstanceOf[HiveTableScanExec]) shouldNot 
equal(None)
    +
    +      }
    +    }
    +  }
    +
    +  test("create hive table with multi format partitions containing correct 
data") {
    +    withTempDir { baseDir =>
    +      //      withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> 
"false") {
    +      val partitionedTable = "ext_multiformat_partition_table_with_data"
    +      val avroPartitionTable = "ext_avro_partition_table"
    +      val pqPartitionTable = "ext_pq_partition_table"
    +
    +      val partitions = createMultiformatPartitionDefinitions(baseDir)
    +
    +      withTable(partitionedTable, avroPartitionTable, pqPartitionTable) {
    +        assert(baseDir.listFiles.isEmpty)
    +
    +        createTableWithPartitions(partitionedTable, baseDir, partitions)
    +        createAvroCheckTable(avroPartitionTable, partitions.last)
    +        createPqCheckTable(pqPartitionTable, partitions.head)
    +
    +        // INSERT OVERWRITE TABLE only works for the default table format.
    +        // So we can use it here to insert data into the parquet partition
    +        sql(
    +          s"""
    +             |INSERT OVERWRITE TABLE $pqPartitionTable
    +             |SELECT 1 as id, 'a' as value
    +                  """.stripMargin)
    --- End diff --
    
    nit: indent.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to