Github user dongjoon-hyun commented on a diff in the pull request:
https://github.com/apache/spark/pull/19218#discussion_r141187555
--- Diff:
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with
TestHiveSingleton with BeforeAndAfter
assert(e.contains("mismatched input 'ROW'"))
}
}
+
+ test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+ "and 'spark.sql.parquet.compression.codec' taking effect on hive table
writing") {
+ case class CompressionConf(name: String, codeC: String)
+
+ case class TableDefine(tableName: String, isPartitioned: Boolean,
format: String,
+ compressionConf: Option[CompressionConf]) {
+ def createTable(rootDir: File): Unit = {
+ val compression = compressionConf.map(cf =>
s"'${cf.name}'='${cf.codeC}'")
+ sql(
+ s"""
+ |CREATE TABLE $tableName(a int)
+ |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" }
+ |STORED AS $format
+ |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+ |${ if (compressionConf.nonEmpty)
s"TBLPROPERTIES(${compression.get})" else "" }
+ """.stripMargin)
+ }
+
+ def insertOverwriteTable(): Unit = {
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE $tableName
+ |${ if (isPartitioned) "partition (p=10000)" else "" }
+ |SELECT * from table_source
+ """.stripMargin)
+ }
+
+ def getDirFiles(file: File): List[File] = {
+ if (!file.exists()) Nil
+ else if (file.isFile) List(file)
+ else {
+ file.listFiles().filterNot(_.getName.startsWith(".hive-staging"))
+ .groupBy(_.isFile).flatMap {
+ case (isFile, files) if isFile => files.toList
+ case (_, dirs) => dirs.flatMap(getDirFiles)
+ }.toList
+ }
+ }
+
+ def getTableSize: Long = {
+ var totalSize = 0L
+ withTempDir { tmpDir =>
+ withTable(tableName) {
+ createTable(tmpDir)
+ insertOverwriteTable()
+ val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName"
+ val dir = new File(path)
+ val files =
getDirFiles(dir).filter(_.getName.startsWith("part-"))
+ totalSize = files.map(_.length()).sum
+ }
+ }
+ totalSize
+ }
+ }
+
+ def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec:
String,
+ sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
+ val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned,
"parquet",
+ Some(CompressionConf("parquet.compression", tableCodec)))
+ val tableOrgSize = tableOrg.getTableSize
+
+ withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) {
+ // priority check, when table-level compression conf was set,
expecting
+ // table-level compression conf is not affected by the session
conf, and table-level
+ // compression conf takes precedence even the two conf of codec is
different
+ val tableOrgSessionConfSize = tableOrg.getTableSize
+ assert(tableOrgSize == tableOrgSessionConfSize)
+
+ // check session conf of compression codec taking effect
+ val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet",
None)
+ assert(f(tableOrg.getTableSize, table.getTableSize))
+ }
+ }
+
+ def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec:
String,
+ sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
+ val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned,
"orc",
+ Some(CompressionConf("orc.compress", tableCodec)))
+ val tableOrgSize = tableOrg.getTableSize
+
+ withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) {
+ // priority check, when table-level compression conf was set,
expecting
+ // table-level compression conf is not affected by the session
conf, and table-level
+ // compression conf takes precedence even the two conf of codec is
different
+ val tableOrgSessionConfSize = tableOrg.getTableSize
+ assert(tableOrgSize == tableOrgSessionConfSize)
+
+ // check session conf of compression codec taking effect
+ val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None)
+ assert(f(tableOrg.getTableSize, table.getTableSize))
+ }
+ }
+
+ withTempView("table_source") {
+ (0 until 100000).toDF("a").createOrReplaceTempView("table_source")
+
+ checkParquetCompressionCodec(true, "UNCOMPRESSED", "UNCOMPRESSED")
+ checkParquetCompressionCodec(true, "GZIP", "GZIP")
+ checkParquetCompressionCodec(true, "GZIP", "UNCOMPRESSED", _ < _)
+
+ withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
--- End diff --
We need to prove this PR works in both cases globally.
- **true** and **false** of `convertMetastoreParquet` for all combinations
(line 828~835)
- **true** and **false** of `convertMetastoreOrc` for all combinations
(line 838~844)
If you use the real compression kinds instead of size, it will be easy.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]