Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/19218#discussion_r142553845
--- Diff:
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with
TestHiveSingleton with BeforeAndAfter
assert(e.contains("mismatched input 'ROW'"))
}
}
+
+ test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+ "and 'spark.sql.orc.compression.codec' taking effect on hive table
writing") {
+
+ val hadoopConf = spark.sessionState.newHadoopConf()
+
+ val partitionStr = "p=10000"
+
+ case class TableCompressionConf(name: String, codeC: String)
+
+ case class TableDefine(tableName: String, isPartitioned: Boolean,
format: String,
+ compressionConf: Option[TableCompressionConf]) {
+ def createTable(rootDir: File): Unit = {
+ val compression = compressionConf.map(cf =>
s"'${cf.name}'='${cf.codeC}'")
+ sql(
+ s"""
+ |CREATE TABLE $tableName(a int)
+ |${if (isPartitioned) "PARTITIONED BY (p int)" else ""}
+ |STORED AS $format
+ |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+ |${if (compressionConf.nonEmpty)
s"TBLPROPERTIES(${compression.get})" else ""}
+ """.stripMargin)
+ }
+
+ def insertOverwriteTable(): Unit = {
+ sql(
+ s"""
+ |INSERT OVERWRITE TABLE $tableName
+ |${if (isPartitioned) s"partition ($partitionStr)" else ""}
+ |SELECT * from table_source
+ """.stripMargin)
+ }
+ }
+
+ def getTableCompressionCodec(path: String, format: String): String = {
+ val codecs = format match {
+ case "parquet" => for {
+ footer <- readAllFootersWithoutSummaryFiles(new Path(path),
hadoopConf)
+ block <- footer.getParquetMetadata.getBlocks.asScala
+ column <- block.getColumns.asScala
+ } yield column.getCodec.name()
+ case "orc" => new File(path).listFiles().filter{ file =>
+ file.isFile && !file.getName.endsWith(".crc") && file.getName !=
"_SUCCESS"
+ }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+ }.toSeq
+ }
+
+ assert(codecs.distinct.length == 1)
+ codecs.head
+ }
+
+ def checkCompressionCodecForTable(format: String, isPartitioned:
Boolean,
+ compressionConf: Option[TableCompressionConf])(assertion: String =>
Unit): Unit = {
+ val table = TableDefine(s"tbl_$format${isPartitioned}",
+ isPartitioned, format, compressionConf)
+ withTempDir { tmpDir =>
+ withTable(table.tableName) {
+ table.createTable(tmpDir)
+ table.insertOverwriteTable()
+ val partition = if (table.isPartitioned) partitionStr else ""
+ val path =
s"${tmpDir.getPath.stripSuffix("/")}/${table.tableName}/$partition"
+ assertion(getTableCompressionCodec(path, table.format))
+ }
+ }
+ }
+
+ def getConvertMetastoreConfName(format: String): String = format match
{
+ case "parquet" => "spark.sql.hive.convertMetastoreParquet"
+ case "orc" => "spark.sql.hive.convertMetastoreOrc"
+ }
+
+ def getSparkCompressionConfName(format: String): String = format match
{
+ case "parquet" => "spark.sql.parquet.compression.codec"
+ case "orc" => "spark.sql.orc.compression.codec"
+ }
+
+ def checkTableCompressionCodecForCodecs(format: String, isPartitioned:
Boolean,
+ convertMetastore: Boolean, compressionCodecs: List[String],
+ tableCompressionConf: List[TableCompressionConf])
--- End diff --
Could you update the indents for all of them in this PR? See the link:
https://github.com/databricks/scala-style-guide#indent
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]