[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user fjh100456 closed the pull request at: https://github.com/apache/spark/pull/19218 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158628542 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala --- @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.Path +import org.apache.orc.OrcConf.COMPRESS +import org.apache.parquet.hadoop.ParquetOutputFormat + +import org.apache.spark.sql.execution.datasources.parquet.ParquetTest +import org.apache.spark.sql.hive.orc.OrcFileOperator +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf + +class CompressionCodecSuite extends TestHiveSingleton with ParquetTest { + import spark.implicits._ + + private val maxRecordNum = 10 --- End diff -- Could you reduce it to a smaller number? The test cases are very slow to run. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158575144 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala --- @@ -42,8 +43,15 @@ private[parquet] class ParquetOptions( * Acceptable values are defined in [[shortParquetCompressionCodecNames]]. */ val compressionCodecClassName: String = { -val codecName = parameters.getOrElse("compression", - sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT) +// `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and +// `spark.sql.parquet.compression.codec` +// are in order of precedence from highest to lowest. +val parquetCompressionConf = parameters.get(ParquetOutputFormat.COMPRESSION) +val codecName = parameters + .get("compression") + .orElse(parquetCompressionConf) --- End diff -- Yeah, we can submit a separate PR for that issue. The behavior change needs to be documented in SparkSQL doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user fjh100456 commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158574986 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala --- @@ -42,8 +43,15 @@ private[parquet] class ParquetOptions( * Acceptable values are defined in [[shortParquetCompressionCodecNames]]. */ val compressionCodecClassName: String = { -val codecName = parameters.getOrElse("compression", - sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT) +// `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and +// `spark.sql.parquet.compression.codec` +// are in order of precedence from highest to lowest. +val parquetCompressionConf = parameters.get(ParquetOutputFormat.COMPRESSION) +val codecName = parameters + .get("compression") + .orElse(parquetCompressionConf) --- End diff -- If so, parquet's table-level compression may be overwrited in this PR, and it may not be what we want. Shall I fix it first in another PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158571702 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala --- @@ -42,8 +43,15 @@ private[parquet] class ParquetOptions( * Acceptable values are defined in [[shortParquetCompressionCodecNames]]. */ val compressionCodecClassName: String = { -val codecName = parameters.getOrElse("compression", - sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT) +// `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and +// `spark.sql.parquet.compression.codec` +// are in order of precedence from highest to lowest. +val parquetCompressionConf = parameters.get(ParquetOutputFormat.COMPRESSION) +val codecName = parameters + .get("compression") + .orElse(parquetCompressionConf) --- End diff -- Could we keep the old behavior? We could add it later? We do not want to mix multiple issues in the same PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158571657 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala --- @@ -102,4 +111,18 @@ object HiveOptions { "collectionDelim" -> "colelction.delim", "mapkeyDelim" -> "mapkey.delim", "lineDelim" -> "line.delim").map { case (k, v) => k.toLowerCase(Locale.ROOT) -> v } + + def getHiveWriteCompression(tableInfo: TableDesc, sqlConf: SQLConf): Option[(String, String)] = { +tableInfo.getOutputFileFormatClassName.toLowerCase match { + case formatName if formatName.endsWith("parquetoutputformat") => +val compressionCodec = new ParquetOptions(tableInfo.getProperties.asScala.toMap, + sqlConf).compressionCodecClassName +Option((ParquetOutputFormat.COMPRESSION, compressionCodec)) + case formatName if formatName.endsWith("orcoutputformat") => +val compressionCodec = new OrcOptions(tableInfo.getProperties.asScala.toMap, + sqlConf).compressionCodec --- End diff -- Yeah. Just to make it consistent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158571649 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -35,7 +39,7 @@ case class TestData(key: Int, value: String) case class ThreeCloumntable(key: Int, value: String, key1: String) class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter -with SQLTestUtils { +with ParquetTest { --- End diff -- Fine to me. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user fjh100456 commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158492627 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -35,7 +39,7 @@ case class TestData(key: Int, value: String) case class ThreeCloumntable(key: Int, value: String, key1: String) class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter -with SQLTestUtils { +with ParquetTest { --- End diff -- Seems compressed table does not always be smaller than uncompressed tables. `SNAPPY` Compression size may be bigger than non-compression size when the amount of data is not big. So I'd like to check the size not equal when compression are different. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user fjh100456 commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158460931 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -35,7 +39,7 @@ case class TestData(key: Int, value: String) case class ThreeCloumntable(key: Int, value: String, key1: String) class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter -with SQLTestUtils { +with ParquetTest { --- End diff -- Ok, I will do it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user fjh100456 commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158459550 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala --- @@ -102,4 +111,18 @@ object HiveOptions { "collectionDelim" -> "colelction.delim", "mapkeyDelim" -> "mapkey.delim", "lineDelim" -> "line.delim").map { case (k, v) => k.toLowerCase(Locale.ROOT) -> v } + + def getHiveWriteCompression(tableInfo: TableDesc, sqlConf: SQLConf): Option[(String, String)] = { +tableInfo.getOutputFileFormatClassName.toLowerCase match { + case formatName if formatName.endsWith("parquetoutputformat") => +val compressionCodec = new ParquetOptions(tableInfo.getProperties.asScala.toMap, + sqlConf).compressionCodecClassName +Option((ParquetOutputFormat.COMPRESSION, compressionCodec)) + case formatName if formatName.endsWith("orcoutputformat") => +val compressionCodec = new OrcOptions(tableInfo.getProperties.asScala.toMap, + sqlConf).compressionCodec --- End diff -- The `compressionCodec ` is used in several places, do you mean I should fix them all? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user fjh100456 commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158458747 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala --- @@ -102,4 +111,18 @@ object HiveOptions { "collectionDelim" -> "colelction.delim", "mapkeyDelim" -> "mapkey.delim", "lineDelim" -> "line.delim").map { case (k, v) => k.toLowerCase(Locale.ROOT) -> v } + + def getHiveWriteCompression(tableInfo: TableDesc, sqlConf: SQLConf): Option[(String, String)] = { +tableInfo.getOutputFileFormatClassName.toLowerCase match { + case formatName if formatName.endsWith("parquetoutputformat") => +val compressionCodec = new ParquetOptions(tableInfo.getProperties.asScala.toMap, + sqlConf).compressionCodecClassName --- End diff -- Yes it looke better, I will change it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user fjh100456 commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158458003 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala --- @@ -19,7 +19,16 @@ package org.apache.spark.sql.hive.execution import java.util.Locale +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} --- End diff -- I will remove it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user fjh100456 commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158457806 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala --- @@ -42,8 +43,15 @@ private[parquet] class ParquetOptions( * Acceptable values are defined in [[shortParquetCompressionCodecNames]]. */ val compressionCodecClassName: String = { -val codecName = parameters.getOrElse("compression", - sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT) +// `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and +// `spark.sql.parquet.compression.codec` +// are in order of precedence from highest to lowest. +val parquetCompressionConf = parameters.get(ParquetOutputFormat.COMPRESSION) +val codecName = parameters + .get("compression") + .orElse(parquetCompressionConf) --- End diff -- Yes it's new. I guess `PartitionOptions` did not used when writing hive table before, because it's invisible for hive. I changeed it to `public`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158445027 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,12 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +// Set compression by priority +HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf) + .foreach{ case (compression, codec) => +hadoopConf.set(compression, codec) + } --- End diff -- ``` .foreach { case (compression, codec) => hadoopConf.set(compression, codec) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158444388 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala --- @@ -19,7 +19,16 @@ package org.apache.spark.sql.hive.execution import java.util.Locale +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} --- End diff -- `FileSinkDesc ` is still needed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158444002 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -35,7 +39,7 @@ case class TestData(key: Int, value: String) case class ThreeCloumntable(key: Int, value: String, key1: String) class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter -with SQLTestUtils { +with ParquetTest { --- End diff -- Please also check whether the compression takes an effect? Compare the size whether is smaller than the original size without compressions? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158443833 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -35,7 +39,7 @@ case class TestData(key: Int, value: String) case class ThreeCloumntable(key: Int, value: String, key1: String) class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter -with SQLTestUtils { +with ParquetTest { --- End diff -- This is the insert suite. We are unable to do this. Could you create a separate suite in the current package ` org.apache.spark.sql.hive`? The suite name can be `CompressionCodecSuite` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158444637 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala --- @@ -42,8 +43,15 @@ private[parquet] class ParquetOptions( * Acceptable values are defined in [[shortParquetCompressionCodecNames]]. */ val compressionCodecClassName: String = { -val codecName = parameters.getOrElse("compression", - sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT) +// `compression`, `parquet.compression`(i.e., ParquetOutputFormat.COMPRESSION), and +// `spark.sql.parquet.compression.codec` +// are in order of precedence from highest to lowest. +val parquetCompressionConf = parameters.get(ParquetOutputFormat.COMPRESSION) +val codecName = parameters + .get("compression") + .orElse(parquetCompressionConf) --- End diff -- Is this new? Do we support `parquet.compression` before this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158444115 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala --- @@ -102,4 +111,18 @@ object HiveOptions { "collectionDelim" -> "colelction.delim", "mapkeyDelim" -> "mapkey.delim", "lineDelim" -> "line.delim").map { case (k, v) => k.toLowerCase(Locale.ROOT) -> v } + + def getHiveWriteCompression(tableInfo: TableDesc, sqlConf: SQLConf): Option[(String, String)] = { +tableInfo.getOutputFileFormatClassName.toLowerCase match { + case formatName if formatName.endsWith("parquetoutputformat") => +val compressionCodec = new ParquetOptions(tableInfo.getProperties.asScala.toMap, + sqlConf).compressionCodecClassName --- End diff -- We normally do not split the code like this. We like the following way: ```Scala val tableProps = tableInfo.getProperties.asScala.toMap tableInfo.getOutputFileFormatClassName.toLowerCase match { case formatName if formatName.endsWith("parquetoutputformat") => val compressionCodec = new ParquetOptions(tableProps, sqlConf).compressionCodecClassName Option((ParquetOutputFormat.COMPRESSION, compressionCodec)) ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r158443604 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala --- @@ -102,4 +111,18 @@ object HiveOptions { "collectionDelim" -> "colelction.delim", "mapkeyDelim" -> "mapkey.delim", "lineDelim" -> "line.delim").map { case (k, v) => k.toLowerCase(Locale.ROOT) -> v } + + def getHiveWriteCompression(tableInfo: TableDesc, sqlConf: SQLConf): Option[(String, String)] = { +tableInfo.getOutputFileFormatClassName.toLowerCase match { + case formatName if formatName.endsWith("parquetoutputformat") => +val compressionCodec = new ParquetOptions(tableInfo.getProperties.asScala.toMap, + sqlConf).compressionCodecClassName +Option((ParquetOutputFormat.COMPRESSION, compressionCodec)) + case formatName if formatName.endsWith("orcoutputformat") => +val compressionCodec = new OrcOptions(tableInfo.getProperties.asScala.toMap, + sqlConf).compressionCodec --- End diff -- Also update `OrcOptions`'s `compressionCodec ` to `compressionCodecClassName` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r157481061 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.toLowerCase.endsWith("parquetoutputformat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority( + fileSinkConf, + compressionConf, + default = sparkSession.sessionState.conf.parquetCompressionCodec) match { + case "NONE" => "UNCOMPRESSED" + case _@x => x +} +hadoopConf.set(compressionConf, compressionCodec) + case formatName if formatName.endsWith("OrcOutputFormat") => --- End diff -- `case formatName if formatName.toLowerCase.endsWith("orcoutputformat") =>`? Or, you write `fileSinkConf.tableInfo.getOutputFileFormatClassName.toLowerCase match {`, then each match does not convert lower-case conversion? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r157478781 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -86,6 +110,19 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { options = Map.empty) } + // Because compression configurations can come in a variety of ways, + // we choose the compression configuration in this order: + // For parquet: `compression` > `parquet.compression` > `spark.sql.parquet.compression.codec` + // For orc: `compression` > `orc.compress` > `spark.sql.orc.compression.codec` --- End diff -- Is it okay to leave this priority in the spark document or somewhere? https://spark.apache.org/docs/latest/sql-programming-guide.html#configuration --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r154509645 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.toLowerCase.endsWith("parquetoutputformat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority( + fileSinkConf, + compressionConf, + default = sparkSession.sessionState.conf.parquetCompressionCodec) match { + case "NONE" => "UNCOMPRESSED" + case _@x => x +} +hadoopConf.set(compressionConf, compressionCodec) + case formatName if formatName.endsWith("OrcOutputFormat") => +val compressionConf = "orc.compress" --- End diff -- -> `OrcRelation.ORC_COMPRESSION` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r154509729 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.toLowerCase.endsWith("parquetoutputformat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority( + fileSinkConf, + compressionConf, + default = sparkSession.sessionState.conf.parquetCompressionCodec) match { + case "NONE" => "UNCOMPRESSED" + case _@x => x +} +hadoopConf.set(compressionConf, compressionCodec) + case formatName if formatName.endsWith("OrcOutputFormat") => +val compressionConf = "orc.compress" +val compressionCodec = getCompressionByPriority( + fileSinkConf, + compressionConf, + default = sparkSession.sessionState.conf.orcCompressionCodec) match { + case "UNCOMPRESSED" => "NONE" + case _@x => x --- End diff -- `case o => o` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r154509719 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.toLowerCase.endsWith("parquetoutputformat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority( + fileSinkConf, + compressionConf, + default = sparkSession.sessionState.conf.parquetCompressionCodec) match { + case "NONE" => "UNCOMPRESSED" + case _@x => x +} +hadoopConf.set(compressionConf, compressionCodec) + case formatName if formatName.endsWith("OrcOutputFormat") => +val compressionConf = "orc.compress" +val compressionCodec = getCompressionByPriority( + fileSinkConf, + compressionConf, + default = sparkSession.sessionState.conf.orcCompressionCodec) match { + case "UNCOMPRESSED" => "NONE" + case _@x => x +} --- End diff -- Move the whole determination logics to `object HiveOptions`. You can call it in `SaveAsHiveFile.scala` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r154509638 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.toLowerCase.endsWith("parquetoutputformat") => +val compressionConf = "parquet.compression" --- End diff -- ` "parquet.compression"` -> `ParquetOutputFormat.COMPRESSION` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r154509749 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.toLowerCase.endsWith("parquetoutputformat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority( + fileSinkConf, + compressionConf, + default = sparkSession.sessionState.conf.parquetCompressionCodec) match { + case "NONE" => "UNCOMPRESSED" + case _@x => x --- End diff -- The same here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r154509801 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.toLowerCase.endsWith("parquetoutputformat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority( + fileSinkConf, + compressionConf, + default = sparkSession.sessionState.conf.parquetCompressionCodec) match { + case "NONE" => "UNCOMPRESSED" + case _@x => x +} +hadoopConf.set(compressionConf, compressionCodec) + case formatName if formatName.endsWith("OrcOutputFormat") => +val compressionConf = "orc.compress" +val compressionCodec = getCompressionByPriority( + fileSinkConf, + compressionConf, + default = sparkSession.sessionState.conf.orcCompressionCodec) match { --- End diff -- I suggest to add a normalization logics for both ORC and Parquet. Check the `ParquetOptions`.`shortParquetCompressionCodecNames` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r154509747 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.toLowerCase.endsWith("parquetoutputformat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority( + fileSinkConf, + compressionConf, + default = sparkSession.sessionState.conf.parquetCompressionCodec) match { + case "NONE" => "UNCOMPRESSED" + case _@x => x +} +hadoopConf.set(compressionConf, compressionCodec) + case formatName if formatName.endsWith("OrcOutputFormat") => +val compressionConf = "orc.compress" +val compressionCodec = getCompressionByPriority( + fileSinkConf, + compressionConf, + default = sparkSession.sessionState.conf.orcCompressionCodec) match { + case "UNCOMPRESSED" => "NONE" --- End diff -- Why always making it upper case? This looks buggy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user fjh100456 commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r144202674 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + private def getConvertMetastoreConfName(format: String): String = format match { +case "parquet" => "spark.sql.hive.convertMetastoreParquet" +case "orc" => "spark.sql.hive.convertMetastoreOrc" + } + + private def getSparkCompressionConfName(format: String): String = format match { +case "parquet" => "spark.sql.parquet.compression.codec" +case "orc" => "spark.sql.orc.compression.codec" + } + + private def getTableCompressPropName(format: String): String = { +format.toLowerCase match { + case "parquet" => "parquet.compression" + case "orc" => "orc.compress" +} + } + + private def getTableCompressionCodec(path: String, format: String): String = { --- End diff -- Change to getHiveCompressPropName, is it appropriate? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r144187454 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + private def getConvertMetastoreConfName(format: String): String = format match { +case "parquet" => "spark.sql.hive.convertMetastoreParquet" +case "orc" => "spark.sql.hive.convertMetastoreOrc" + } + + private def getSparkCompressionConfName(format: String): String = format match { +case "parquet" => "spark.sql.parquet.compression.codec" +case "orc" => "spark.sql.orc.compression.codec" + } + + private def getTableCompressPropName(format: String): String = { +format.toLowerCase match { + case "parquet" => "parquet.compression" + case "orc" => "orc.compress" +} + } + + private def getTableCompressionCodec(path: String, format: String): String = { +val hadoopConf = spark.sessionState.newHadoopConf() +val codecs = format match { + case "parquet" => for { +footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf) +block <- footer.getParquetMetadata.getBlocks.asScala +column <- block.getColumns.asScala + } yield column.getCodec.name() + case "orc" => new File(path).listFiles().filter{ file => +file.isFile && !file.getName.endsWith(".crc") && file.getName != "_SUCCESS" + }.map { orcFile => + OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString + }.toSeq +} + +assert(codecs.distinct.length == 1) +codecs.head + } + + private def writeDataToTable( + rootDir: File, + tableName: String, + isPartitioned: Boolean, + format: String, + compressionCodec: Option[String]) { +val tblProperties = compressionCodec match { + case Some(prop) => s"TBLPROPERTIES('${getTableCompressPropName(format)}'='$prop')" + case _ => "" +} +val partitionCreate = if (isPartitioned) "PARTITIONED BY (p int)" else "" +sql( + s""" + |CREATE TABLE $tableName(a int) + |$partitionCreate + |STORED AS $format + |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' + |$tblProperties + """.stripMargin) + +val partitionInsert = if (isPartitioned) s"partition (p=1)" else "" +sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |$partitionInsert + |SELECT * from table_source --- End diff -- nit. `from` -> `FROM` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r144187309 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + private def getConvertMetastoreConfName(format: String): String = format match { +case "parquet" => "spark.sql.hive.convertMetastoreParquet" +case "orc" => "spark.sql.hive.convertMetastoreOrc" + } + + private def getSparkCompressionConfName(format: String): String = format match { +case "parquet" => "spark.sql.parquet.compression.codec" +case "orc" => "spark.sql.orc.compression.codec" + } + + private def getTableCompressPropName(format: String): String = { +format.toLowerCase match { + case "parquet" => "parquet.compression" + case "orc" => "orc.compress" +} + } + + private def getTableCompressionCodec(path: String, format: String): String = { --- End diff -- The logic means a compression codec from the files. The prefix `getTable` looks misleading to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r144187101 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + private def getConvertMetastoreConfName(format: String): String = format match { +case "parquet" => "spark.sql.hive.convertMetastoreParquet" +case "orc" => "spark.sql.hive.convertMetastoreOrc" + } + + private def getSparkCompressionConfName(format: String): String = format match { +case "parquet" => "spark.sql.parquet.compression.codec" +case "orc" => "spark.sql.orc.compression.codec" --- End diff -- Here, too. - `SQLConf.PARQUET_COMPRESSION.key` instead of "spark.sql.parquet.compression.codec" - `SQLConf.ORC_COMPRESSION.key` insead of "spark.sql.orc.compression.codec" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r144186944 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + private def getConvertMetastoreConfName(format: String): String = format match { +case "parquet" => "spark.sql.hive.convertMetastoreParquet" +case "orc" => "spark.sql.hive.convertMetastoreOrc" --- End diff -- Could you use keys? - `HiveUtils.CONVERT_METASTORE_PARQUET.key` instead of "spark.sql.hive.convertMetastoreParquet" - `HiveUtils.CONVERT_METASTORE_ORC.key` instead of "spark.sql.hive.convertMetastoreOrc" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user fjh100456 commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r143624224 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.endsWith("ParquetOutputFormat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf, + sparkSession.sessionState.conf.parquetCompressionCodec) match { + case "NONE" => "UNCOMPRESSED" + case _@x => x +} +hadoopConf.set(compressionConf, compressionCodec) + case formatName if formatName.endsWith("OrcOutputFormat") => +val compressionConf = "orc.compress" +val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf, + sparkSession.sessionState.conf.orcCompressionCodec) match { + case "UNCOMPRESSED" => "NONE" --- End diff -- Yes, they are different, the style of parameter names and parameter values are all different, and should be parquet and orc problems. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user fjh100456 commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r143624210 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.endsWith("ParquetOutputFormat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf, + sparkSession.sessionState.conf.parquetCompressionCodec) match { + case "NONE" => "UNCOMPRESSED" + case _@x => x +} +hadoopConf.set(compressionConf, compressionCodec) + case formatName if formatName.endsWith("OrcOutputFormat") => +val compressionConf = "orc.compress" +val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf, + sparkSession.sessionState.conf.orcCompressionCodec) match { + case "UNCOMPRESSED" => "NONE" + case _@x => x --- End diff -- In fact, the following process will check the correctness of this value, and because "orcoptions" is not accessable here, I have to add the "uncompressed" => "NONE" conversion. Do you have any good advice? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user fjh100456 commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r143624196 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.endsWith("ParquetOutputFormat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf, + sparkSession.sessionState.conf.parquetCompressionCodec) match { --- End diff -- `compressionConf` will be used below, I've adjusted the format, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user fjh100456 commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r143624181 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.endsWith("ParquetOutputFormat") => --- End diff -- Sounds good idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142554065 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + --- End diff -- Could you split the whole test case to multiple independent smaller unit test cases? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142553845 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + +"and 'spark.sql.orc.compression.codec' taking effect on hive table writing") { + +val hadoopConf = spark.sessionState.newHadoopConf() + +val partitionStr = "p=1" + +case class TableCompressionConf(name: String, codeC: String) + +case class TableDefine(tableName: String, isPartitioned: Boolean, format: String, + compressionConf: Option[TableCompressionConf]) { + def createTable(rootDir: File): Unit = { +val compression = compressionConf.map(cf => s"'${cf.name}'='${cf.codeC}'") +sql( + s""" + |CREATE TABLE $tableName(a int) + |${if (isPartitioned) "PARTITIONED BY (p int)" else ""} + |STORED AS $format + |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' + |${if (compressionConf.nonEmpty) s"TBLPROPERTIES(${compression.get})" else ""} +""".stripMargin) + } + + def insertOverwriteTable(): Unit = { +sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |${if (isPartitioned) s"partition ($partitionStr)" else ""} + |SELECT * from table_source +""".stripMargin) + } +} + +def getTableCompressionCodec(path: String, format: String): String = { + val codecs = format match { +case "parquet" => for { + footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf) + block <- footer.getParquetMetadata.getBlocks.asScala + column <- block.getColumns.asScala +} yield column.getCodec.name() +case "orc" => new File(path).listFiles().filter{ file => + file.isFile && !file.getName.endsWith(".crc") && file.getName != "_SUCCESS" +}.map { orcFile => + OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString +}.toSeq + } + + assert(codecs.distinct.length == 1) + codecs.head +} + +def checkCompressionCodecForTable(format: String, isPartitioned: Boolean, + compressionConf: Option[TableCompressionConf])(assertion: String => Unit): Unit = { + val table = TableDefine(s"tbl_$format${isPartitioned}", +isPartitioned, format, compressionConf) + withTempDir { tmpDir => +withTable(table.tableName) { + table.createTable(tmpDir) + table.insertOverwriteTable() + val partition = if (table.isPartitioned) partitionStr else "" + val path = s"${tmpDir.getPath.stripSuffix("/")}/${table.tableName}/$partition" + assertion(getTableCompressionCodec(path, table.format)) +} + } +} + +def getConvertMetastoreConfName(format: String): String = format match { + case "parquet" => "spark.sql.hive.convertMetastoreParquet" + case "orc" => "spark.sql.hive.convertMetastoreOrc" +} + +def getSparkCompressionConfName(format: String): String = format match { + case "parquet" => "spark.sql.parquet.compression.codec" + case "orc" => "spark.sql.orc.compression.codec" +} + +def checkTableCompressionCodecForCodecs(format: String, isPartitioned: Boolean, + convertMetastore: Boolean, compressionCodecs: List[String], + tableCompressionConf: List[TableCompressionConf]) --- End diff -- Could you update the indents for all of them in this PR? See the link: https://github.com/databricks/scala-style-guide#indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142553648 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + +"and 'spark.sql.orc.compression.codec' taking effect on hive table writing") { + +val hadoopConf = spark.sessionState.newHadoopConf() + +val partitionStr = "p=1" + +case class TableCompressionConf(name: String, codeC: String) + +case class TableDefine(tableName: String, isPartitioned: Boolean, format: String, --- End diff -- Use a function? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142553517 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + +"and 'spark.sql.orc.compression.codec' taking effect on hive table writing") { + +val hadoopConf = spark.sessionState.newHadoopConf() + +val partitionStr = "p=1" + +case class TableCompressionConf(name: String, codeC: String) + +case class TableDefine(tableName: String, isPartitioned: Boolean, format: String, + compressionConf: Option[TableCompressionConf]) { + def createTable(rootDir: File): Unit = { +val compression = compressionConf.map(cf => s"'${cf.name}'='${cf.codeC}'") +sql( + s""" + |CREATE TABLE $tableName(a int) + |${if (isPartitioned) "PARTITIONED BY (p int)" else ""} --- End diff -- Please do not embed it. Just create a parameter above this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142553387 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.endsWith("ParquetOutputFormat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf, + sparkSession.sessionState.conf.parquetCompressionCodec) match { + case "NONE" => "UNCOMPRESSED" + case _@x => x +} +hadoopConf.set(compressionConf, compressionCodec) + case formatName if formatName.endsWith("OrcOutputFormat") => +val compressionConf = "orc.compress" +val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf, + sparkSession.sessionState.conf.orcCompressionCodec) match { + case "UNCOMPRESSED" => "NONE" --- End diff -- Why ORC and Parquet are different? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142553277 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.endsWith("ParquetOutputFormat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf, + sparkSession.sessionState.conf.parquetCompressionCodec) match { + case "NONE" => "UNCOMPRESSED" + case _@x => x +} +hadoopConf.set(compressionConf, compressionCodec) + case formatName if formatName.endsWith("OrcOutputFormat") => +val compressionConf = "orc.compress" +val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf, + sparkSession.sessionState.conf.orcCompressionCodec) match { + case "UNCOMPRESSED" => "NONE" + case _@x => x --- End diff -- `case x => x`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142553192 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.endsWith("ParquetOutputFormat") => +val compressionConf = "parquet.compression" +val compressionCodec = getCompressionByPriority(fileSinkConf, compressionConf, + sparkSession.sessionState.conf.parquetCompressionCodec) match { --- End diff -- ```Scala val compressionCodec = getCompressionByPriority( fileSinkConf, compressionConf = "parquet.compression", default = sparkSession.sessionState.conf.parquetCompressionCodec) match { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142552754 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +fileSinkConf.tableInfo.getOutputFileFormatClassName match { + case formatName if formatName.endsWith("ParquetOutputFormat") => --- End diff -- Is it case sensitive? Should we convert it to lower case and upper case for string comparison? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142552658 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -86,6 +106,14 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { options = Map.empty) } + private def getCompressionByPriority(fileSinkConf: FileSinkDesc, +compressionConf: String, default: String): String = { --- End diff -- Could you add the description to explain the priority sequences? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r142552413 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -86,6 +106,14 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { options = Map.empty) } + private def getCompressionByPriority(fileSinkConf: FileSinkDesc, +compressionConf: String, default: String): String = { --- End diff -- ```Scala private def getCompressionByPriority( fileSinkConf: FileSinkDesc, compressionConf: String, default: String): String = { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r141188386 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + +"and 'spark.sql.parquet.compression.codec' taking effect on hive table writing") { +case class CompressionConf(name: String, codeC: String) + +case class TableDefine(tableName: String, isPartitioned: Boolean, format: String, + compressionConf: Option[CompressionConf]) { + def createTable(rootDir: File): Unit = { +val compression = compressionConf.map(cf => s"'${cf.name}'='${cf.codeC}'") +sql( + s""" + |CREATE TABLE $tableName(a int) + |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" } + |STORED AS $format + |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' + |${ if (compressionConf.nonEmpty) s"TBLPROPERTIES(${compression.get})" else "" } +""".stripMargin) + } + + def insertOverwriteTable(): Unit = { +sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |${ if (isPartitioned) "partition (p=1)" else "" } + |SELECT * from table_source +""".stripMargin) + } + + def getDirFiles(file: File): List[File] = { +if (!file.exists()) Nil +else if (file.isFile) List(file) +else { + file.listFiles().filterNot(_.getName.startsWith(".hive-staging")) +.groupBy(_.isFile).flatMap { +case (isFile, files) if isFile => files.toList +case (_, dirs) => dirs.flatMap(getDirFiles) + }.toList +} + } + + def getTableSize: Long = { +var totalSize = 0L +withTempDir { tmpDir => + withTable(tableName) { +createTable(tmpDir) +insertOverwriteTable() +val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName" +val dir = new File(path) +val files = getDirFiles(dir).filter(_.getName.startsWith("part-")) +totalSize = files.map(_.length()).sum + } +} +totalSize + } +} + +def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: String, + sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = { + val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, "parquet", +Some(CompressionConf("parquet.compression", tableCodec))) + val tableOrgSize = tableOrg.getTableSize + + withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) { +// priority check, when table-level compression conf was set, expecting +// table-level compression conf is not affected by the session conf, and table-level +// compression conf takes precedence even the two conf of codec is different +val tableOrgSessionConfSize = tableOrg.getTableSize +assert(tableOrgSize == tableOrgSessionConfSize) + +// check session conf of compression codec taking effect +val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", None) +assert(f(tableOrg.getTableSize, table.getTableSize)) + } +} + +def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: String, + sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = { + val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, "orc", +Some(CompressionConf("orc.compress", tableCodec))) + val tableOrgSize = tableOrg.getTableSize + + withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) { +// priority check, when table-level compression conf was set, expecting +// table-level compression conf is not affected by the session conf, and table-level +// compression conf takes precedence even the two conf of codec is different +val tableOrgSessionConfSize = tableOrg.getTableSize +assert(tableOrgSize == tableOrgSessionConfSize) + +// check session conf of compression codec taking effect +val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None) +assert(f(tableOrg.getTableSize, table.getTableSize)) + } +} + +withTempView("table_source") { + (0 until
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r141187890 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + +"and 'spark.sql.parquet.compression.codec' taking effect on hive table writing") { --- End diff -- - `[SPARK-21786]` -> `SPARK-21786` - `spark.sql.parquet.compression.codec` -> `spark.sql.orc.compression.codec`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r141187555 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + +"and 'spark.sql.parquet.compression.codec' taking effect on hive table writing") { +case class CompressionConf(name: String, codeC: String) + +case class TableDefine(tableName: String, isPartitioned: Boolean, format: String, + compressionConf: Option[CompressionConf]) { + def createTable(rootDir: File): Unit = { +val compression = compressionConf.map(cf => s"'${cf.name}'='${cf.codeC}'") +sql( + s""" + |CREATE TABLE $tableName(a int) + |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" } + |STORED AS $format + |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' + |${ if (compressionConf.nonEmpty) s"TBLPROPERTIES(${compression.get})" else "" } +""".stripMargin) + } + + def insertOverwriteTable(): Unit = { +sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |${ if (isPartitioned) "partition (p=1)" else "" } + |SELECT * from table_source +""".stripMargin) + } + + def getDirFiles(file: File): List[File] = { +if (!file.exists()) Nil +else if (file.isFile) List(file) +else { + file.listFiles().filterNot(_.getName.startsWith(".hive-staging")) +.groupBy(_.isFile).flatMap { +case (isFile, files) if isFile => files.toList +case (_, dirs) => dirs.flatMap(getDirFiles) + }.toList +} + } + + def getTableSize: Long = { +var totalSize = 0L +withTempDir { tmpDir => + withTable(tableName) { +createTable(tmpDir) +insertOverwriteTable() +val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName" +val dir = new File(path) +val files = getDirFiles(dir).filter(_.getName.startsWith("part-")) +totalSize = files.map(_.length()).sum + } +} +totalSize + } +} + +def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: String, + sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = { + val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, "parquet", +Some(CompressionConf("parquet.compression", tableCodec))) + val tableOrgSize = tableOrg.getTableSize + + withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) { +// priority check, when table-level compression conf was set, expecting +// table-level compression conf is not affected by the session conf, and table-level +// compression conf takes precedence even the two conf of codec is different +val tableOrgSessionConfSize = tableOrg.getTableSize +assert(tableOrgSize == tableOrgSessionConfSize) + +// check session conf of compression codec taking effect +val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", None) +assert(f(tableOrg.getTableSize, table.getTableSize)) + } +} + +def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: String, + sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = { + val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, "orc", +Some(CompressionConf("orc.compress", tableCodec))) + val tableOrgSize = tableOrg.getTableSize + + withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) { +// priority check, when table-level compression conf was set, expecting +// table-level compression conf is not affected by the session conf, and table-level +// compression conf takes precedence even the two conf of codec is different +val tableOrgSessionConfSize = tableOrg.getTableSize +assert(tableOrgSize == tableOrgSessionConfSize) + +// check session conf of compression codec taking effect +val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None) +assert(f(tableOrg.getTableSize, table.getTableSize)) + } +} + +withTempView("table_source") { + (0 until
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r141186378 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala --- @@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter assert(e.contains("mismatched input 'ROW'")) } } + + test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " + +"and 'spark.sql.parquet.compression.codec' taking effect on hive table writing") { +case class CompressionConf(name: String, codeC: String) + +case class TableDefine(tableName: String, isPartitioned: Boolean, format: String, + compressionConf: Option[CompressionConf]) { + def createTable(rootDir: File): Unit = { +val compression = compressionConf.map(cf => s"'${cf.name}'='${cf.codeC}'") +sql( + s""" + |CREATE TABLE $tableName(a int) + |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" } + |STORED AS $format + |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' + |${ if (compressionConf.nonEmpty) s"TBLPROPERTIES(${compression.get})" else "" } +""".stripMargin) + } + + def insertOverwriteTable(): Unit = { +sql( + s""" + |INSERT OVERWRITE TABLE $tableName + |${ if (isPartitioned) "partition (p=1)" else "" } + |SELECT * from table_source +""".stripMargin) + } + + def getDirFiles(file: File): List[File] = { +if (!file.exists()) Nil +else if (file.isFile) List(file) +else { + file.listFiles().filterNot(_.getName.startsWith(".hive-staging")) +.groupBy(_.isFile).flatMap { +case (isFile, files) if isFile => files.toList +case (_, dirs) => dirs.flatMap(getDirFiles) + }.toList +} + } + + def getTableSize: Long = { +var totalSize = 0L +withTempDir { tmpDir => + withTable(tableName) { +createTable(tmpDir) +insertOverwriteTable() +val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName" +val dir = new File(path) +val files = getDirFiles(dir).filter(_.getName.startsWith("part-")) +totalSize = files.map(_.length()).sum + } +} +totalSize + } +} + +def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: String, + sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = { + val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, "parquet", +Some(CompressionConf("parquet.compression", tableCodec))) + val tableOrgSize = tableOrg.getTableSize + + withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) { +// priority check, when table-level compression conf was set, expecting +// table-level compression conf is not affected by the session conf, and table-level +// compression conf takes precedence even the two conf of codec is different +val tableOrgSessionConfSize = tableOrg.getTableSize +assert(tableOrgSize == tableOrgSessionConfSize) + +// check session conf of compression codec taking effect +val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", None) +assert(f(tableOrg.getTableSize, table.getTableSize)) + } +} + +def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: String, + sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = { + val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, "orc", +Some(CompressionConf("orc.compress", tableCodec))) + val tableOrgSize = tableOrg.getTableSize + + withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) { +// priority check, when table-level compression conf was set, expecting +// table-level compression conf is not affected by the session conf, and table-level +// compression conf takes precedence even the two conf of codec is different +val tableOrgSessionConfSize = tableOrg.getTableSize +assert(tableOrgSize == tableOrgSessionConfSize) + +// check session conf of compression codec taking effect +val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None) +assert(f(tableOrg.getTableSize, table.getTableSize)) --- End diff -- You may want to check the codec explicitly like
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r139302763 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala --- @@ -101,6 +101,19 @@ case class InsertIntoHiveTable( val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) +tableDesc.getOutputFileFormatClassName match { --- End diff -- Move the whole logics into `saveAsHiveFile`, which is being shared by `InsertIntoHiveDirCommand` and `InsertIntoHiveTable`. Both need these logics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19218#discussion_r139186318 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala --- @@ -101,6 +101,13 @@ case class InsertIntoHiveTable( val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) +tableDesc.getOutputFileFormatClassName match { + case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" => --- End diff -- - **Parquet**: It seems that you need to consider another output format, [parquet.hive.DeprecatedParquetOutputFormat](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala#L1173), too. - **ORC**: We have [spark.sql.orc.compression.codec](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L320) by SPARK-21839. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
GitHub user fjh100456 opened a pull request: https://github.com/apache/spark/pull/19218 [SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configuration doesn't take effect on tables with partition field(s) [SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configuration doesn't take effect on tables with partition field(s) ## What changes were proposed in this pull request? Pass âspark.sql.parquet.compression.codecâ value to âparquet.compressionâ. ## How was this patch tested? Manual test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fjh100456/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19218.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19218 commit 677541b47f27fd85f44aa2e46ec44861579475a8 Author: fjh100456Date: 2017-09-13T09:24:15Z [SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configuration doesn't take effect on tables with partition field(s) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org