Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r141188386 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + + "and 'spark.sql.parquet.compression.codec' taking effect on hive table writing") { + case class CompressionConf(name: String, codeC: String) + + case class TableDefine(tableName: String, isPartitioned: Boolean, format: String, + compressionConf: Option[CompressionConf]) { + def createTable(rootDir: File): Unit = { + val compression = compressionConf.map(cf => s"'${cf.name}'='${cf.codeC}'") + sql( + s""" + |CREATE TABLE $tableName(a int) + |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" } + |STORED AS $format + |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' + |${ if (compressionConf.nonEmpty) s"TBLPROPERTIES(${compression.get})" else "" } + """.stripMargin) + } + + def insertOverwriteTable(): Unit = { + sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |${ if (isPartitioned) "partition (p=10000)" else "" } + |SELECT * from table_source + """.stripMargin) + } + + def getDirFiles(file: File): List[File] = { + if (!file.exists()) Nil + else if (file.isFile) List(file) + else { + file.listFiles().filterNot(_.getName.startsWith(".hive-staging")) + .groupBy(_.isFile).flatMap { + case (isFile, files) if isFile => files.toList + case (_, dirs) => dirs.flatMap(getDirFiles) + }.toList + } + } + + def getTableSize: Long = { + var totalSize = 0L + withTempDir { tmpDir => + withTable(tableName) { + createTable(tmpDir) + insertOverwriteTable() + val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName" + val dir = new File(path) + val files = getDirFiles(dir).filter(_.getName.startsWith("part-")) + totalSize = files.map(_.length()).sum + } + } + totalSize + } + } + + def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: String, + sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = { + val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, "parquet", + Some(CompressionConf("parquet.compression", tableCodec))) + val tableOrgSize = tableOrg.getTableSize + + withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) { + // priority check, when table-level compression conf was set, expecting + // table-level compression conf is not affected by the session conf, and table-level + // compression conf takes precedence even the two conf of codec is different + val tableOrgSessionConfSize = tableOrg.getTableSize + assert(tableOrgSize == tableOrgSessionConfSize) + + // check session conf of compression codec taking effect + val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", None) + assert(f(tableOrg.getTableSize, table.getTableSize)) + } + } + + def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: String, + sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = { + val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, "orc", + Some(CompressionConf("orc.compress", tableCodec))) + val tableOrgSize = tableOrg.getTableSize + + withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) { + // priority check, when table-level compression conf was set, expecting + // table-level compression conf is not affected by the session conf, and table-level + // compression conf takes precedence even the two conf of codec is different + val tableOrgSessionConfSize = tableOrg.getTableSize + assert(tableOrgSize == tableOrgSessionConfSize) + + // check session conf of compression codec taking effect + val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None) + assert(f(tableOrg.getTableSize, table.getTableSize)) + } + } + + withTempView("table_source") { + (0 until 100000).toDF("a").createOrReplaceTempView("table_source") + + checkParquetCompressionCodec(true, "UNCOMPRESSED", "UNCOMPRESSED") + checkParquetCompressionCodec(true, "GZIP", "GZIP") + checkParquetCompressionCodec(true, "GZIP", "UNCOMPRESSED", _ < _) + + withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") { --- End diff -- If this is due to the issue of partitioned tables, could you add comments here?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org