Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19218#discussion_r141188386
  
    --- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
    @@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
           assert(e.contains("mismatched input 'ROW'"))
         }
       }
    +
    +  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
    +    "and 'spark.sql.parquet.compression.codec' taking effect on hive table 
writing") {
    +    case class CompressionConf(name: String, codeC: String)
    +
    +    case class TableDefine(tableName: String, isPartitioned: Boolean, 
format: String,
    +      compressionConf: Option[CompressionConf]) {
    +      def createTable(rootDir: File): Unit = {
    +        val compression = compressionConf.map(cf => 
s"'${cf.name}'='${cf.codeC}'")
    +        sql(
    +          s"""
    +          |CREATE TABLE $tableName(a int)
    +          |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" }
    +          |STORED AS $format
    +          |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
    +          |${ if (compressionConf.nonEmpty) 
s"TBLPROPERTIES(${compression.get})" else "" }
    +        """.stripMargin)
    +      }
    +
    +      def insertOverwriteTable(): Unit = {
    +        sql(
    +          s"""
    +          |INSERT OVERWRITE TABLE $tableName
    +          |${ if (isPartitioned) "partition (p=10000)" else "" }
    +          |SELECT * from table_source
    +        """.stripMargin)
    +      }
    +
    +      def getDirFiles(file: File): List[File] = {
    +        if (!file.exists()) Nil
    +        else if (file.isFile) List(file)
    +        else {
    +          file.listFiles().filterNot(_.getName.startsWith(".hive-staging"))
    +            .groupBy(_.isFile).flatMap {
    +            case (isFile, files) if isFile => files.toList
    +            case (_, dirs) => dirs.flatMap(getDirFiles)
    +          }.toList
    +        }
    +      }
    +
    +      def getTableSize: Long = {
    +        var totalSize = 0L
    +        withTempDir { tmpDir =>
    +          withTable(tableName) {
    +            createTable(tmpDir)
    +            insertOverwriteTable()
    +            val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName"
    +            val dir = new File(path)
    +            val files = 
getDirFiles(dir).filter(_.getName.startsWith("part-"))
    +            totalSize = files.map(_.length()).sum
    +          }
    +        }
    +        totalSize
    +      }
    +    }
    +
    +    def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: 
String,
    +      sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
    +      val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, 
"parquet",
    +        Some(CompressionConf("parquet.compression", tableCodec)))
    +      val tableOrgSize = tableOrg.getTableSize
    +
    +      withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) {
    +        // priority check, when table-level compression conf was set, 
expecting
    +        // table-level compression conf is not affected by the session 
conf, and table-level
    +        // compression conf takes precedence even the two conf of codec is 
different
    +        val tableOrgSessionConfSize = tableOrg.getTableSize
    +        assert(tableOrgSize == tableOrgSessionConfSize)
    +
    +        // check session conf of compression codec taking effect
    +        val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", 
None)
    +        assert(f(tableOrg.getTableSize, table.getTableSize))
    +      }
    +    }
    +
    +    def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: 
String,
    +      sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
    +      val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, 
"orc",
    +        Some(CompressionConf("orc.compress", tableCodec)))
    +      val tableOrgSize = tableOrg.getTableSize
    +
    +      withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) {
    +        // priority check, when table-level compression conf was set, 
expecting
    +        // table-level compression conf is not affected by the session 
conf, and table-level
    +        // compression conf takes precedence even the two conf of codec is 
different
    +        val tableOrgSessionConfSize = tableOrg.getTableSize
    +        assert(tableOrgSize == tableOrgSessionConfSize)
    +
    +        // check session conf of compression codec taking effect
    +        val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None)
    +        assert(f(tableOrg.getTableSize, table.getTableSize))
    +      }
    +    }
    +
    +    withTempView("table_source") {
    +      (0 until 100000).toDF("a").createOrReplaceTempView("table_source")
    +
    +      checkParquetCompressionCodec(true, "UNCOMPRESSED", "UNCOMPRESSED")
    +      checkParquetCompressionCodec(true, "GZIP", "GZIP")
    +      checkParquetCompressionCodec(true, "GZIP", "UNCOMPRESSED", _ < _)
    +
    +      withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
    --- End diff --
    
    If this is due to the issue of partitioned tables, could you add comments 
here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to