[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-26 Thread fjh100456
Github user fjh100456 closed the pull request at:

https://github.com/apache/spark/pull/19218


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-25 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158628542
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+
+import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest {
+  import spark.implicits._
+
+  private val maxRecordNum = 10
--- End diff --

Could you reduce it to a smaller number? The test cases are very slow to 
run.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158575144
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 ---
@@ -42,8 +43,15 @@ private[parquet] class ParquetOptions(
* Acceptable values are defined in 
[[shortParquetCompressionCodecNames]].
*/
   val compressionCodecClassName: String = {
-val codecName = parameters.getOrElse("compression",
-  sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT)
+// `compression`, `parquet.compression`(i.e., 
ParquetOutputFormat.COMPRESSION), and
+// `spark.sql.parquet.compression.codec`
+// are in order of precedence from highest to lowest.
+val parquetCompressionConf = 
parameters.get(ParquetOutputFormat.COMPRESSION)
+val codecName = parameters
+  .get("compression")
+  .orElse(parquetCompressionConf)
--- End diff --

Yeah, we can submit a separate PR for that issue. The behavior change needs 
to be documented in SparkSQL doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158574986
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 ---
@@ -42,8 +43,15 @@ private[parquet] class ParquetOptions(
* Acceptable values are defined in 
[[shortParquetCompressionCodecNames]].
*/
   val compressionCodecClassName: String = {
-val codecName = parameters.getOrElse("compression",
-  sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT)
+// `compression`, `parquet.compression`(i.e., 
ParquetOutputFormat.COMPRESSION), and
+// `spark.sql.parquet.compression.codec`
+// are in order of precedence from highest to lowest.
+val parquetCompressionConf = 
parameters.get(ParquetOutputFormat.COMPRESSION)
+val codecName = parameters
+  .get("compression")
+  .orElse(parquetCompressionConf)
--- End diff --

If so, parquet's table-level compression may be overwrited in this PR, and 
it may not be what we want.
Shall I  fix it first in another PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158571702
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 ---
@@ -42,8 +43,15 @@ private[parquet] class ParquetOptions(
* Acceptable values are defined in 
[[shortParquetCompressionCodecNames]].
*/
   val compressionCodecClassName: String = {
-val codecName = parameters.getOrElse("compression",
-  sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT)
+// `compression`, `parquet.compression`(i.e., 
ParquetOutputFormat.COMPRESSION), and
+// `spark.sql.parquet.compression.codec`
+// are in order of precedence from highest to lowest.
+val parquetCompressionConf = 
parameters.get(ParquetOutputFormat.COMPRESSION)
+val codecName = parameters
+  .get("compression")
+  .orElse(parquetCompressionConf)
--- End diff --

Could we keep the old behavior? We could add it later? We do not want to 
mix multiple issues in the same PR? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158571657
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala 
---
@@ -102,4 +111,18 @@ object HiveOptions {
 "collectionDelim" -> "colelction.delim",
 "mapkeyDelim" -> "mapkey.delim",
 "lineDelim" -> "line.delim").map { case (k, v) => 
k.toLowerCase(Locale.ROOT) -> v }
+
+  def getHiveWriteCompression(tableInfo: TableDesc, sqlConf: SQLConf): 
Option[(String, String)] = {
+tableInfo.getOutputFileFormatClassName.toLowerCase match {
+  case formatName if formatName.endsWith("parquetoutputformat") =>
+val compressionCodec = new 
ParquetOptions(tableInfo.getProperties.asScala.toMap,
+  sqlConf).compressionCodecClassName
+Option((ParquetOutputFormat.COMPRESSION, compressionCodec))
+  case formatName if formatName.endsWith("orcoutputformat") =>
+val compressionCodec = new 
OrcOptions(tableInfo.getProperties.asScala.toMap,
+  sqlConf).compressionCodec
--- End diff --

Yeah. Just to make it consistent 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158571649
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -35,7 +39,7 @@ case class TestData(key: Int, value: String)
 case class ThreeCloumntable(key: Int, value: String, key1: String)
 
 class InsertSuite extends QueryTest with TestHiveSingleton with 
BeforeAndAfter
-with SQLTestUtils {
+with ParquetTest {
--- End diff --

Fine to me. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158492627
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -35,7 +39,7 @@ case class TestData(key: Int, value: String)
 case class ThreeCloumntable(key: Int, value: String, key1: String)
 
 class InsertSuite extends QueryTest with TestHiveSingleton with 
BeforeAndAfter
-with SQLTestUtils {
+with ParquetTest {
--- End diff --

Seems compressed table does not always be smaller than uncompressed tables.
`SNAPPY` Compression size may be bigger than non-compression size when the 
amount of data is not big. So I'd like to check the size not equal when 
compression are different.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158460931
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -35,7 +39,7 @@ case class TestData(key: Int, value: String)
 case class ThreeCloumntable(key: Int, value: String, key1: String)
 
 class InsertSuite extends QueryTest with TestHiveSingleton with 
BeforeAndAfter
-with SQLTestUtils {
+with ParquetTest {
--- End diff --

Ok, I will do it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158459550
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala 
---
@@ -102,4 +111,18 @@ object HiveOptions {
 "collectionDelim" -> "colelction.delim",
 "mapkeyDelim" -> "mapkey.delim",
 "lineDelim" -> "line.delim").map { case (k, v) => 
k.toLowerCase(Locale.ROOT) -> v }
+
+  def getHiveWriteCompression(tableInfo: TableDesc, sqlConf: SQLConf): 
Option[(String, String)] = {
+tableInfo.getOutputFileFormatClassName.toLowerCase match {
+  case formatName if formatName.endsWith("parquetoutputformat") =>
+val compressionCodec = new 
ParquetOptions(tableInfo.getProperties.asScala.toMap,
+  sqlConf).compressionCodecClassName
+Option((ParquetOutputFormat.COMPRESSION, compressionCodec))
+  case formatName if formatName.endsWith("orcoutputformat") =>
+val compressionCodec = new 
OrcOptions(tableInfo.getProperties.asScala.toMap,
+  sqlConf).compressionCodec
--- End diff --

The `compressionCodec ` is used in several places, do you mean I should fix 
them all?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158458747
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala 
---
@@ -102,4 +111,18 @@ object HiveOptions {
 "collectionDelim" -> "colelction.delim",
 "mapkeyDelim" -> "mapkey.delim",
 "lineDelim" -> "line.delim").map { case (k, v) => 
k.toLowerCase(Locale.ROOT) -> v }
+
+  def getHiveWriteCompression(tableInfo: TableDesc, sqlConf: SQLConf): 
Option[(String, String)] = {
+tableInfo.getOutputFileFormatClassName.toLowerCase match {
+  case formatName if formatName.endsWith("parquetoutputformat") =>
+val compressionCodec = new 
ParquetOptions(tableInfo.getProperties.asScala.toMap,
+  sqlConf).compressionCodecClassName
--- End diff --

Yes it looke better, I will change it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158458003
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala 
---
@@ -19,7 +19,16 @@ package org.apache.spark.sql.hive.execution
 
 import java.util.Locale
 
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
--- End diff --

I will remove it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158457806
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 ---
@@ -42,8 +43,15 @@ private[parquet] class ParquetOptions(
* Acceptable values are defined in 
[[shortParquetCompressionCodecNames]].
*/
   val compressionCodecClassName: String = {
-val codecName = parameters.getOrElse("compression",
-  sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT)
+// `compression`, `parquet.compression`(i.e., 
ParquetOutputFormat.COMPRESSION), and
+// `spark.sql.parquet.compression.codec`
+// are in order of precedence from highest to lowest.
+val parquetCompressionConf = 
parameters.get(ParquetOutputFormat.COMPRESSION)
+val codecName = parameters
+  .get("compression")
+  .orElse(parquetCompressionConf)
--- End diff --

Yes it's new. I guess `PartitionOptions` did not used when writing hive 
table before, because it's invisible for hive. I changeed it to `public`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158445027
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,12 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+// Set compression by priority
+HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, 
sparkSession.sessionState.conf)
+  .foreach{ case (compression, codec) =>
+hadoopConf.set(compression, codec)
+  }
--- End diff --

```
.foreach { case (compression, codec) => hadoopConf.set(compression, codec) }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158444388
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala 
---
@@ -19,7 +19,16 @@ package org.apache.spark.sql.hive.execution
 
 import java.util.Locale
 
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
--- End diff --

`FileSinkDesc ` is still needed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158444002
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -35,7 +39,7 @@ case class TestData(key: Int, value: String)
 case class ThreeCloumntable(key: Int, value: String, key1: String)
 
 class InsertSuite extends QueryTest with TestHiveSingleton with 
BeforeAndAfter
-with SQLTestUtils {
+with ParquetTest {
--- End diff --

Please also check whether the compression takes an effect? Compare the size 
whether is smaller than the original size without compressions?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158443833
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -35,7 +39,7 @@ case class TestData(key: Int, value: String)
 case class ThreeCloumntable(key: Int, value: String, key1: String)
 
 class InsertSuite extends QueryTest with TestHiveSingleton with 
BeforeAndAfter
-with SQLTestUtils {
+with ParquetTest {
--- End diff --

This is the insert suite. We are unable to do this. 

Could you create a separate suite in the current package ` 
org.apache.spark.sql.hive`? The suite name can be `CompressionCodecSuite`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158444637
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 ---
@@ -42,8 +43,15 @@ private[parquet] class ParquetOptions(
* Acceptable values are defined in 
[[shortParquetCompressionCodecNames]].
*/
   val compressionCodecClassName: String = {
-val codecName = parameters.getOrElse("compression",
-  sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT)
+// `compression`, `parquet.compression`(i.e., 
ParquetOutputFormat.COMPRESSION), and
+// `spark.sql.parquet.compression.codec`
+// are in order of precedence from highest to lowest.
+val parquetCompressionConf = 
parameters.get(ParquetOutputFormat.COMPRESSION)
+val codecName = parameters
+  .get("compression")
+  .orElse(parquetCompressionConf)
--- End diff --

Is this new? Do we support `parquet.compression` before this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158444115
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala 
---
@@ -102,4 +111,18 @@ object HiveOptions {
 "collectionDelim" -> "colelction.delim",
 "mapkeyDelim" -> "mapkey.delim",
 "lineDelim" -> "line.delim").map { case (k, v) => 
k.toLowerCase(Locale.ROOT) -> v }
+
+  def getHiveWriteCompression(tableInfo: TableDesc, sqlConf: SQLConf): 
Option[(String, String)] = {
+tableInfo.getOutputFileFormatClassName.toLowerCase match {
+  case formatName if formatName.endsWith("parquetoutputformat") =>
+val compressionCodec = new 
ParquetOptions(tableInfo.getProperties.asScala.toMap,
+  sqlConf).compressionCodecClassName
--- End diff --

We normally do not split the code like this. We like the following way:
```Scala
val tableProps = tableInfo.getProperties.asScala.toMap
tableInfo.getOutputFileFormatClassName.toLowerCase match {
  case formatName if formatName.endsWith("parquetoutputformat") =>
val compressionCodec = new ParquetOptions(tableProps, 
sqlConf).compressionCodecClassName
Option((ParquetOutputFormat.COMPRESSION, compressionCodec))
...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r158443604
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala 
---
@@ -102,4 +111,18 @@ object HiveOptions {
 "collectionDelim" -> "colelction.delim",
 "mapkeyDelim" -> "mapkey.delim",
 "lineDelim" -> "line.delim").map { case (k, v) => 
k.toLowerCase(Locale.ROOT) -> v }
+
+  def getHiveWriteCompression(tableInfo: TableDesc, sqlConf: SQLConf): 
Option[(String, String)] = {
+tableInfo.getOutputFileFormatClassName.toLowerCase match {
+  case formatName if formatName.endsWith("parquetoutputformat") =>
+val compressionCodec = new 
ParquetOptions(tableInfo.getProperties.asScala.toMap,
+  sqlConf).compressionCodecClassName
+Option((ParquetOutputFormat.COMPRESSION, compressionCodec))
+  case formatName if formatName.endsWith("orcoutputformat") =>
+val compressionCodec = new 
OrcOptions(tableInfo.getProperties.asScala.toMap,
+  sqlConf).compressionCodec
--- End diff --

Also update `OrcOptions`'s `compressionCodec ` to 
`compressionCodecClassName`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-18 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r157481061
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if 
formatName.toLowerCase.endsWith("parquetoutputformat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(
+  fileSinkConf,
+  compressionConf,
+  default = 
sparkSession.sessionState.conf.parquetCompressionCodec) match {
+  case "NONE" => "UNCOMPRESSED"
+  case _@x => x
+}
+hadoopConf.set(compressionConf, compressionCodec)
+  case formatName if formatName.endsWith("OrcOutputFormat") =>
--- End diff --

`case formatName if formatName.toLowerCase.endsWith("orcoutputformat") =>`?
Or, you write 
`fileSinkConf.tableInfo.getOutputFileFormatClassName.toLowerCase match {`, then 
each match does not convert lower-case conversion?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-18 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r157478781
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -86,6 +110,19 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
   options = Map.empty)
   }
 
+  // Because compression configurations can come in a variety of ways,
+  // we choose the compression configuration in this order:
+  // For parquet: `compression` > `parquet.compression` > 
`spark.sql.parquet.compression.codec`
+  // For orc: `compression` > `orc.compress` > 
`spark.sql.orc.compression.codec`
--- End diff --

Is it okay to leave this priority in the spark document or somewhere? 
https://spark.apache.org/docs/latest/sql-programming-guide.html#configuration


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r154509645
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if 
formatName.toLowerCase.endsWith("parquetoutputformat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(
+  fileSinkConf,
+  compressionConf,
+  default = 
sparkSession.sessionState.conf.parquetCompressionCodec) match {
+  case "NONE" => "UNCOMPRESSED"
+  case _@x => x
+}
+hadoopConf.set(compressionConf, compressionCodec)
+  case formatName if formatName.endsWith("OrcOutputFormat") =>
+val compressionConf = "orc.compress"
--- End diff --

-> `OrcRelation.ORC_COMPRESSION`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r154509729
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if 
formatName.toLowerCase.endsWith("parquetoutputformat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(
+  fileSinkConf,
+  compressionConf,
+  default = 
sparkSession.sessionState.conf.parquetCompressionCodec) match {
+  case "NONE" => "UNCOMPRESSED"
+  case _@x => x
+}
+hadoopConf.set(compressionConf, compressionCodec)
+  case formatName if formatName.endsWith("OrcOutputFormat") =>
+val compressionConf = "orc.compress"
+val compressionCodec = getCompressionByPriority(
+  fileSinkConf,
+  compressionConf,
+  default = sparkSession.sessionState.conf.orcCompressionCodec) 
match {
+  case "UNCOMPRESSED" => "NONE"
+  case _@x => x
--- End diff --

`case o => o`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r154509719
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if 
formatName.toLowerCase.endsWith("parquetoutputformat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(
+  fileSinkConf,
+  compressionConf,
+  default = 
sparkSession.sessionState.conf.parquetCompressionCodec) match {
+  case "NONE" => "UNCOMPRESSED"
+  case _@x => x
+}
+hadoopConf.set(compressionConf, compressionCodec)
+  case formatName if formatName.endsWith("OrcOutputFormat") =>
+val compressionConf = "orc.compress"
+val compressionCodec = getCompressionByPriority(
+  fileSinkConf,
+  compressionConf,
+  default = sparkSession.sessionState.conf.orcCompressionCodec) 
match {
+  case "UNCOMPRESSED" => "NONE"
+  case _@x => x
+}
--- End diff --

Move the whole determination logics to `object HiveOptions`. You can call 
it in  `SaveAsHiveFile.scala`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r154509638
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if 
formatName.toLowerCase.endsWith("parquetoutputformat") =>
+val compressionConf = "parquet.compression"
--- End diff --

` "parquet.compression"` -> `ParquetOutputFormat.COMPRESSION`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r154509749
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if 
formatName.toLowerCase.endsWith("parquetoutputformat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(
+  fileSinkConf,
+  compressionConf,
+  default = 
sparkSession.sessionState.conf.parquetCompressionCodec) match {
+  case "NONE" => "UNCOMPRESSED"
+  case _@x => x
--- End diff --

The same here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r154509801
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if 
formatName.toLowerCase.endsWith("parquetoutputformat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(
+  fileSinkConf,
+  compressionConf,
+  default = 
sparkSession.sessionState.conf.parquetCompressionCodec) match {
+  case "NONE" => "UNCOMPRESSED"
+  case _@x => x
+}
+hadoopConf.set(compressionConf, compressionCodec)
+  case formatName if formatName.endsWith("OrcOutputFormat") =>
+val compressionConf = "orc.compress"
+val compressionCodec = getCompressionByPriority(
+  fileSinkConf,
+  compressionConf,
+  default = sparkSession.sessionState.conf.orcCompressionCodec) 
match {
--- End diff --

I suggest to add a normalization logics for both ORC and Parquet. 

Check the `ParquetOptions`.`shortParquetCompressionCodecNames` 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-02 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r154509747
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,30 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if 
formatName.toLowerCase.endsWith("parquetoutputformat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(
+  fileSinkConf,
+  compressionConf,
+  default = 
sparkSession.sessionState.conf.parquetCompressionCodec) match {
+  case "NONE" => "UNCOMPRESSED"
+  case _@x => x
+}
+hadoopConf.set(compressionConf, compressionCodec)
+  case formatName if formatName.endsWith("OrcOutputFormat") =>
+val compressionConf = "orc.compress"
+val compressionCodec = getCompressionByPriority(
+  fileSinkConf,
+  compressionConf,
+  default = sparkSession.sessionState.conf.orcCompressionCodec) 
match {
+  case "UNCOMPRESSED" => "NONE"
--- End diff --

Why always making it upper case? This looks buggy.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-12 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r144202674
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  private def getConvertMetastoreConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.hive.convertMetastoreParquet"
+case "orc" => "spark.sql.hive.convertMetastoreOrc"
+  }
+
+  private def getSparkCompressionConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.parquet.compression.codec"
+case "orc" => "spark.sql.orc.compression.codec"
+  }
+
+  private def getTableCompressPropName(format: String): String = {
+format.toLowerCase match {
+  case "parquet" => "parquet.compression"
+  case "orc" => "orc.compress"
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
String = {
--- End diff --

Change to getHiveCompressPropName, is it appropriate?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-11 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r144187454
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  private def getConvertMetastoreConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.hive.convertMetastoreParquet"
+case "orc" => "spark.sql.hive.convertMetastoreOrc"
+  }
+
+  private def getSparkCompressionConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.parquet.compression.codec"
+case "orc" => "spark.sql.orc.compression.codec"
+  }
+
+  private def getTableCompressPropName(format: String): String = {
+format.toLowerCase match {
+  case "parquet" => "parquet.compression"
+  case "orc" => "orc.compress"
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
String = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+
+assert(codecs.distinct.length == 1)
+codecs.head
+  }
+
+  private def writeDataToTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]) {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getTableCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p int)" else 
""
+sql(
+  s"""
+ |CREATE TABLE $tableName(a int)
+ |$partitionCreate
+ |STORED AS $format
+ |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+ |$tblProperties
+   """.stripMargin)
+
+val partitionInsert = if (isPartitioned) s"partition (p=1)" else ""
+sql(
+  s"""
+ |INSERT OVERWRITE TABLE $tableName
+ |$partitionInsert
+ |SELECT * from table_source
--- End diff --

nit. `from` -> `FROM`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-11 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r144187309
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  private def getConvertMetastoreConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.hive.convertMetastoreParquet"
+case "orc" => "spark.sql.hive.convertMetastoreOrc"
+  }
+
+  private def getSparkCompressionConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.parquet.compression.codec"
+case "orc" => "spark.sql.orc.compression.codec"
+  }
+
+  private def getTableCompressPropName(format: String): String = {
+format.toLowerCase match {
+  case "parquet" => "parquet.compression"
+  case "orc" => "orc.compress"
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
String = {
--- End diff --

The logic means a compression codec from the files. The prefix `getTable` 
looks misleading to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-11 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r144187101
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  private def getConvertMetastoreConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.hive.convertMetastoreParquet"
+case "orc" => "spark.sql.hive.convertMetastoreOrc"
+  }
+
+  private def getSparkCompressionConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.parquet.compression.codec"
+case "orc" => "spark.sql.orc.compression.codec"
--- End diff --

Here, too.
- `SQLConf.PARQUET_COMPRESSION.key` instead of 
"spark.sql.parquet.compression.codec"
- `SQLConf.ORC_COMPRESSION.key` insead of "spark.sql.orc.compression.codec"



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-11 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r144186944
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,254 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  private def getConvertMetastoreConfName(format: String): String = format 
match {
+case "parquet" => "spark.sql.hive.convertMetastoreParquet"
+case "orc" => "spark.sql.hive.convertMetastoreOrc"
--- End diff --

Could you use keys?
- `HiveUtils.CONVERT_METASTORE_PARQUET.key` instead of 
"spark.sql.hive.convertMetastoreParquet"
- `HiveUtils.CONVERT_METASTORE_ORC.key` instead of 
"spark.sql.hive.convertMetastoreOrc"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-09 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r143624224
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if formatName.endsWith("ParquetOutputFormat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(fileSinkConf, 
compressionConf,
+  sparkSession.sessionState.conf.parquetCompressionCodec) match {
+  case "NONE" => "UNCOMPRESSED"
+  case _@x => x
+}
+hadoopConf.set(compressionConf, compressionCodec)
+  case formatName if formatName.endsWith("OrcOutputFormat") =>
+val compressionConf = "orc.compress"
+val compressionCodec = getCompressionByPriority(fileSinkConf, 
compressionConf,
+  sparkSession.sessionState.conf.orcCompressionCodec) match {
+  case "UNCOMPRESSED" => "NONE"
--- End diff --

Yes, they are different, the style of parameter names and parameter values 
are all different, and should be parquet and orc problems.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-09 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r143624210
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if formatName.endsWith("ParquetOutputFormat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(fileSinkConf, 
compressionConf,
+  sparkSession.sessionState.conf.parquetCompressionCodec) match {
+  case "NONE" => "UNCOMPRESSED"
+  case _@x => x
+}
+hadoopConf.set(compressionConf, compressionCodec)
+  case formatName if formatName.endsWith("OrcOutputFormat") =>
+val compressionConf = "orc.compress"
+val compressionCodec = getCompressionByPriority(fileSinkConf, 
compressionConf,
+  sparkSession.sessionState.conf.orcCompressionCodec) match {
+  case "UNCOMPRESSED" => "NONE"
+  case _@x => x
--- End diff --

In fact, the following process will check the correctness of this value, 
and because "orcoptions" is not accessable here, I have to add the 
"uncompressed" => "NONE" conversion.
Do you have any good advice?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-09 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r143624196
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if formatName.endsWith("ParquetOutputFormat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(fileSinkConf, 
compressionConf,
+  sparkSession.sessionState.conf.parquetCompressionCodec) match {
--- End diff --

`compressionConf` will be used below, I've adjusted the format, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-09 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r143624181
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if formatName.endsWith("ParquetOutputFormat") =>
--- End diff --

Sounds good idea.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142554065
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
--- End diff --

Could you split the whole test case to multiple independent smaller unit 
test cases?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142553845
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+"and 'spark.sql.orc.compression.codec' taking effect on hive table 
writing") {
+
+val hadoopConf = spark.sessionState.newHadoopConf()
+
+val partitionStr = "p=1"
+
+case class TableCompressionConf(name: String, codeC: String)
+
+case class TableDefine(tableName: String, isPartitioned: Boolean, 
format: String,
+  compressionConf: Option[TableCompressionConf]) {
+  def createTable(rootDir: File): Unit = {
+val compression = compressionConf.map(cf => 
s"'${cf.name}'='${cf.codeC}'")
+sql(
+  s"""
+  |CREATE TABLE $tableName(a int)
+  |${if (isPartitioned) "PARTITIONED BY (p int)" else ""}
+  |STORED AS $format
+  |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+  |${if (compressionConf.nonEmpty) 
s"TBLPROPERTIES(${compression.get})" else ""}
+""".stripMargin)
+  }
+
+  def insertOverwriteTable(): Unit = {
+sql(
+  s"""
+  |INSERT OVERWRITE TABLE $tableName
+  |${if (isPartitioned) s"partition ($partitionStr)" else ""}
+  |SELECT * from table_source
+""".stripMargin)
+  }
+}
+
+def getTableCompressionCodec(path: String, format: String): String = {
+  val codecs = format match {
+case "parquet" => for {
+  footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+  block <- footer.getParquetMetadata.getBlocks.asScala
+  column <- block.getColumns.asScala
+} yield column.getCodec.name()
+case "orc" => new File(path).listFiles().filter{ file =>
+  file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+}.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+}.toSeq
+  }
+
+  assert(codecs.distinct.length == 1)
+  codecs.head
+}
+
+def checkCompressionCodecForTable(format: String, isPartitioned: 
Boolean,
+  compressionConf: Option[TableCompressionConf])(assertion: String => 
Unit): Unit = {
+  val table = TableDefine(s"tbl_$format${isPartitioned}",
+isPartitioned, format, compressionConf)
+  withTempDir { tmpDir =>
+withTable(table.tableName) {
+  table.createTable(tmpDir)
+  table.insertOverwriteTable()
+  val partition = if (table.isPartitioned) partitionStr else ""
+  val path = 
s"${tmpDir.getPath.stripSuffix("/")}/${table.tableName}/$partition"
+  assertion(getTableCompressionCodec(path, table.format))
+}
+  }
+}
+
+def getConvertMetastoreConfName(format: String): String = format match 
{
+  case "parquet" => "spark.sql.hive.convertMetastoreParquet"
+  case "orc" => "spark.sql.hive.convertMetastoreOrc"
+}
+
+def getSparkCompressionConfName(format: String): String = format match 
{
+  case "parquet" => "spark.sql.parquet.compression.codec"
+  case "orc" => "spark.sql.orc.compression.codec"
+}
+
+def checkTableCompressionCodecForCodecs(format: String, isPartitioned: 
Boolean,
+  convertMetastore: Boolean, compressionCodecs: List[String],
+  tableCompressionConf: List[TableCompressionConf])
--- End diff --

Could you update the indents for all of them in this PR? See the link: 
https://github.com/databricks/scala-style-guide#indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142553648
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+"and 'spark.sql.orc.compression.codec' taking effect on hive table 
writing") {
+
+val hadoopConf = spark.sessionState.newHadoopConf()
+
+val partitionStr = "p=1"
+
+case class TableCompressionConf(name: String, codeC: String)
+
+case class TableDefine(tableName: String, isPartitioned: Boolean, 
format: String,
--- End diff --

Use a function?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142553517
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +732,195 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+"and 'spark.sql.orc.compression.codec' taking effect on hive table 
writing") {
+
+val hadoopConf = spark.sessionState.newHadoopConf()
+
+val partitionStr = "p=1"
+
+case class TableCompressionConf(name: String, codeC: String)
+
+case class TableDefine(tableName: String, isPartitioned: Boolean, 
format: String,
+  compressionConf: Option[TableCompressionConf]) {
+  def createTable(rootDir: File): Unit = {
+val compression = compressionConf.map(cf => 
s"'${cf.name}'='${cf.codeC}'")
+sql(
+  s"""
+  |CREATE TABLE $tableName(a int)
+  |${if (isPartitioned) "PARTITIONED BY (p int)" else ""}
--- End diff --

Please do not embed it. Just create a parameter above this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142553387
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if formatName.endsWith("ParquetOutputFormat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(fileSinkConf, 
compressionConf,
+  sparkSession.sessionState.conf.parquetCompressionCodec) match {
+  case "NONE" => "UNCOMPRESSED"
+  case _@x => x
+}
+hadoopConf.set(compressionConf, compressionCodec)
+  case formatName if formatName.endsWith("OrcOutputFormat") =>
+val compressionConf = "orc.compress"
+val compressionCodec = getCompressionByPriority(fileSinkConf, 
compressionConf,
+  sparkSession.sessionState.conf.orcCompressionCodec) match {
+  case "UNCOMPRESSED" => "NONE"
--- End diff --

Why ORC and Parquet are different?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142553277
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if formatName.endsWith("ParquetOutputFormat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(fileSinkConf, 
compressionConf,
+  sparkSession.sessionState.conf.parquetCompressionCodec) match {
+  case "NONE" => "UNCOMPRESSED"
+  case _@x => x
+}
+hadoopConf.set(compressionConf, compressionCodec)
+  case formatName if formatName.endsWith("OrcOutputFormat") =>
+val compressionConf = "orc.compress"
+val compressionCodec = getCompressionByPriority(fileSinkConf, 
compressionConf,
+  sparkSession.sessionState.conf.orcCompressionCodec) match {
+  case "UNCOMPRESSED" => "NONE"
+  case _@x => x
--- End diff --

`case x => x`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142553192
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if formatName.endsWith("ParquetOutputFormat") =>
+val compressionConf = "parquet.compression"
+val compressionCodec = getCompressionByPriority(fileSinkConf, 
compressionConf,
+  sparkSession.sessionState.conf.parquetCompressionCodec) match {
--- End diff --

```Scala
 val compressionCodec = getCompressionByPriority(
fileSinkConf, 
compressionConf = "parquet.compression",
default = sparkSession.sessionState.conf.parquetCompressionCodec) match 
{
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142552754
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,26 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+fileSinkConf.tableInfo.getOutputFileFormatClassName match {
+  case formatName if formatName.endsWith("ParquetOutputFormat") =>
--- End diff --

Is it case sensitive? Should we convert it to lower case and upper case for 
string comparison?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142552658
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -86,6 +106,14 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
   options = Map.empty)
   }
 
+  private def getCompressionByPriority(fileSinkConf: FileSinkDesc,
+compressionConf: String, default: String): String = {
--- End diff --

Could you add the description to explain the priority sequences?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-10-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r142552413
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -86,6 +106,14 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
   options = Map.empty)
   }
 
+  private def getCompressionByPriority(fileSinkConf: FileSinkDesc,
+compressionConf: String, default: String): String = {
--- End diff --

```Scala
private def getCompressionByPriority(
fileSinkConf: FileSinkDesc,
compressionConf: String,
default: String): String = {
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-09-26 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r141188386
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+"and 'spark.sql.parquet.compression.codec' taking effect on hive table 
writing") {
+case class CompressionConf(name: String, codeC: String)
+
+case class TableDefine(tableName: String, isPartitioned: Boolean, 
format: String,
+  compressionConf: Option[CompressionConf]) {
+  def createTable(rootDir: File): Unit = {
+val compression = compressionConf.map(cf => 
s"'${cf.name}'='${cf.codeC}'")
+sql(
+  s"""
+  |CREATE TABLE $tableName(a int)
+  |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" }
+  |STORED AS $format
+  |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+  |${ if (compressionConf.nonEmpty) 
s"TBLPROPERTIES(${compression.get})" else "" }
+""".stripMargin)
+  }
+
+  def insertOverwriteTable(): Unit = {
+sql(
+  s"""
+  |INSERT OVERWRITE TABLE $tableName
+  |${ if (isPartitioned) "partition (p=1)" else "" }
+  |SELECT * from table_source
+""".stripMargin)
+  }
+
+  def getDirFiles(file: File): List[File] = {
+if (!file.exists()) Nil
+else if (file.isFile) List(file)
+else {
+  file.listFiles().filterNot(_.getName.startsWith(".hive-staging"))
+.groupBy(_.isFile).flatMap {
+case (isFile, files) if isFile => files.toList
+case (_, dirs) => dirs.flatMap(getDirFiles)
+  }.toList
+}
+  }
+
+  def getTableSize: Long = {
+var totalSize = 0L
+withTempDir { tmpDir =>
+  withTable(tableName) {
+createTable(tmpDir)
+insertOverwriteTable()
+val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName"
+val dir = new File(path)
+val files = 
getDirFiles(dir).filter(_.getName.startsWith("part-"))
+totalSize = files.map(_.length()).sum
+  }
+}
+totalSize
+  }
+}
+
+def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: 
String,
+  sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
+  val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, 
"parquet",
+Some(CompressionConf("parquet.compression", tableCodec)))
+  val tableOrgSize = tableOrg.getTableSize
+
+  withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) {
+// priority check, when table-level compression conf was set, 
expecting
+// table-level compression conf is not affected by the session 
conf, and table-level
+// compression conf takes precedence even the two conf of codec is 
different
+val tableOrgSessionConfSize = tableOrg.getTableSize
+assert(tableOrgSize == tableOrgSessionConfSize)
+
+// check session conf of compression codec taking effect
+val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", 
None)
+assert(f(tableOrg.getTableSize, table.getTableSize))
+  }
+}
+
+def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: 
String,
+  sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
+  val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, 
"orc",
+Some(CompressionConf("orc.compress", tableCodec)))
+  val tableOrgSize = tableOrg.getTableSize
+
+  withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) {
+// priority check, when table-level compression conf was set, 
expecting
+// table-level compression conf is not affected by the session 
conf, and table-level
+// compression conf takes precedence even the two conf of codec is 
different
+val tableOrgSessionConfSize = tableOrg.getTableSize
+assert(tableOrgSize == tableOrgSessionConfSize)
+
+// check session conf of compression codec taking effect
+val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None)
+assert(f(tableOrg.getTableSize, table.getTableSize))
+  }
+}
+
+withTempView("table_source") {
+  (0 until 

[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-09-26 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r141187890
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+"and 'spark.sql.parquet.compression.codec' taking effect on hive table 
writing") {
--- End diff --

- `[SPARK-21786]` -> `SPARK-21786`
- `spark.sql.parquet.compression.codec` -> 
`spark.sql.orc.compression.codec`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-09-26 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r141187555
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+"and 'spark.sql.parquet.compression.codec' taking effect on hive table 
writing") {
+case class CompressionConf(name: String, codeC: String)
+
+case class TableDefine(tableName: String, isPartitioned: Boolean, 
format: String,
+  compressionConf: Option[CompressionConf]) {
+  def createTable(rootDir: File): Unit = {
+val compression = compressionConf.map(cf => 
s"'${cf.name}'='${cf.codeC}'")
+sql(
+  s"""
+  |CREATE TABLE $tableName(a int)
+  |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" }
+  |STORED AS $format
+  |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+  |${ if (compressionConf.nonEmpty) 
s"TBLPROPERTIES(${compression.get})" else "" }
+""".stripMargin)
+  }
+
+  def insertOverwriteTable(): Unit = {
+sql(
+  s"""
+  |INSERT OVERWRITE TABLE $tableName
+  |${ if (isPartitioned) "partition (p=1)" else "" }
+  |SELECT * from table_source
+""".stripMargin)
+  }
+
+  def getDirFiles(file: File): List[File] = {
+if (!file.exists()) Nil
+else if (file.isFile) List(file)
+else {
+  file.listFiles().filterNot(_.getName.startsWith(".hive-staging"))
+.groupBy(_.isFile).flatMap {
+case (isFile, files) if isFile => files.toList
+case (_, dirs) => dirs.flatMap(getDirFiles)
+  }.toList
+}
+  }
+
+  def getTableSize: Long = {
+var totalSize = 0L
+withTempDir { tmpDir =>
+  withTable(tableName) {
+createTable(tmpDir)
+insertOverwriteTable()
+val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName"
+val dir = new File(path)
+val files = 
getDirFiles(dir).filter(_.getName.startsWith("part-"))
+totalSize = files.map(_.length()).sum
+  }
+}
+totalSize
+  }
+}
+
+def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: 
String,
+  sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
+  val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, 
"parquet",
+Some(CompressionConf("parquet.compression", tableCodec)))
+  val tableOrgSize = tableOrg.getTableSize
+
+  withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) {
+// priority check, when table-level compression conf was set, 
expecting
+// table-level compression conf is not affected by the session 
conf, and table-level
+// compression conf takes precedence even the two conf of codec is 
different
+val tableOrgSessionConfSize = tableOrg.getTableSize
+assert(tableOrgSize == tableOrgSessionConfSize)
+
+// check session conf of compression codec taking effect
+val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", 
None)
+assert(f(tableOrg.getTableSize, table.getTableSize))
+  }
+}
+
+def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: 
String,
+  sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
+  val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, 
"orc",
+Some(CompressionConf("orc.compress", tableCodec)))
+  val tableOrgSize = tableOrg.getTableSize
+
+  withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) {
+// priority check, when table-level compression conf was set, 
expecting
+// table-level compression conf is not affected by the session 
conf, and table-level
+// compression conf takes precedence even the two conf of codec is 
different
+val tableOrgSessionConfSize = tableOrg.getTableSize
+assert(tableOrgSize == tableOrgSessionConfSize)
+
+// check session conf of compression codec taking effect
+val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None)
+assert(f(tableOrg.getTableSize, table.getTableSize))
+  }
+}
+
+withTempView("table_source") {
+  (0 until 

[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-09-26 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r141186378
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala ---
@@ -728,4 +728,120 @@ class InsertSuite extends QueryTest with 
TestHiveSingleton with BeforeAndAfter
   assert(e.contains("mismatched input 'ROW'"))
 }
   }
+
+  test("[SPARK-21786] Check 'spark.sql.parquet.compression.codec' " +
+"and 'spark.sql.parquet.compression.codec' taking effect on hive table 
writing") {
+case class CompressionConf(name: String, codeC: String)
+
+case class TableDefine(tableName: String, isPartitioned: Boolean, 
format: String,
+  compressionConf: Option[CompressionConf]) {
+  def createTable(rootDir: File): Unit = {
+val compression = compressionConf.map(cf => 
s"'${cf.name}'='${cf.codeC}'")
+sql(
+  s"""
+  |CREATE TABLE $tableName(a int)
+  |${ if (isPartitioned) "PARTITIONED BY (p int)" else "" }
+  |STORED AS $format
+  |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+  |${ if (compressionConf.nonEmpty) 
s"TBLPROPERTIES(${compression.get})" else "" }
+""".stripMargin)
+  }
+
+  def insertOverwriteTable(): Unit = {
+sql(
+  s"""
+  |INSERT OVERWRITE TABLE $tableName
+  |${ if (isPartitioned) "partition (p=1)" else "" }
+  |SELECT * from table_source
+""".stripMargin)
+  }
+
+  def getDirFiles(file: File): List[File] = {
+if (!file.exists()) Nil
+else if (file.isFile) List(file)
+else {
+  file.listFiles().filterNot(_.getName.startsWith(".hive-staging"))
+.groupBy(_.isFile).flatMap {
+case (isFile, files) if isFile => files.toList
+case (_, dirs) => dirs.flatMap(getDirFiles)
+  }.toList
+}
+  }
+
+  def getTableSize: Long = {
+var totalSize = 0L
+withTempDir { tmpDir =>
+  withTable(tableName) {
+createTable(tmpDir)
+insertOverwriteTable()
+val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName"
+val dir = new File(path)
+val files = 
getDirFiles(dir).filter(_.getName.startsWith("part-"))
+totalSize = files.map(_.length()).sum
+  }
+}
+totalSize
+  }
+}
+
+def checkParquetCompressionCodec(isPartitioned: Boolean, tableCodec: 
String,
+  sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
+  val tableOrg = TableDefine(s"tbl_parquet$tableCodec", isPartitioned, 
"parquet",
+Some(CompressionConf("parquet.compression", tableCodec)))
+  val tableOrgSize = tableOrg.getTableSize
+
+  withSQLConf("spark.sql.parquet.compression.codec" -> sessionCodec) {
+// priority check, when table-level compression conf was set, 
expecting
+// table-level compression conf is not affected by the session 
conf, and table-level
+// compression conf takes precedence even the two conf of codec is 
different
+val tableOrgSessionConfSize = tableOrg.getTableSize
+assert(tableOrgSize == tableOrgSessionConfSize)
+
+// check session conf of compression codec taking effect
+val table = TableDefine(s"tbl_parquet", isPartitioned, "parquet", 
None)
+assert(f(tableOrg.getTableSize, table.getTableSize))
+  }
+}
+
+def checkOrcCompressionCodec(isPartitioned: Boolean, tableCodec: 
String,
+  sessionCodec: String, f: (Long, Long) => Boolean = _ == _): Unit = {
+  val tableOrg = TableDefine(s"tbl_orc$tableCodec", isPartitioned, 
"orc",
+Some(CompressionConf("orc.compress", tableCodec)))
+  val tableOrgSize = tableOrg.getTableSize
+
+  withSQLConf("spark.sql.orc.compression.codec" -> sessionCodec) {
+// priority check, when table-level compression conf was set, 
expecting
+// table-level compression conf is not affected by the session 
conf, and table-level
+// compression conf takes precedence even the two conf of codec is 
different
+val tableOrgSessionConfSize = tableOrg.getTableSize
+assert(tableOrgSize == tableOrgSessionConfSize)
+
+// check session conf of compression codec taking effect
+val table = TableDefine(s"tbl_orc", isPartitioned, "orc", None)
+assert(f(tableOrg.getTableSize, table.getTableSize))
--- End diff --

You may want to check the codec explicitly like 

[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-09-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r139302763
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 ---
@@ -101,6 +101,19 @@ case class InsertIntoHiveTable(
 val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, 
tableLocation)
 val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, 
false)
 
+tableDesc.getOutputFileFormatClassName match {
--- End diff --

Move the whole logics into `saveAsHiveFile`, which is being shared by 
`InsertIntoHiveDirCommand` and `InsertIntoHiveTable`. Both need these logics. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-09-15 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19218#discussion_r139186318
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 ---
@@ -101,6 +101,13 @@ case class InsertIntoHiveTable(
 val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, 
tableLocation)
 val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, 
false)
 
+tableDesc.getOutputFileFormatClassName match {
+  case 
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" =>
--- End diff --

- **Parquet**: It seems that you need to consider another output format, 
[parquet.hive.DeprecatedParquetOutputFormat](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala#L1173),
 too.
- **ORC**:  We have 
[spark.sql.orc.compression.codec](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L320)
 by SPARK-21839.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-09-13 Thread fjh100456
GitHub user fjh100456 opened a pull request:

https://github.com/apache/spark/pull/19218

[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configuration 
doesn't take effect on tables with partition field(s)

[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configuration 
doesn't take effect on tables with partition field(s)

## What changes were proposed in this pull request?
Pass ‘spark.sql.parquet.compression.codec’ value to 
‘parquet.compression’.

## How was this patch tested?
Manual test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fjh100456/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19218.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19218


commit 677541b47f27fd85f44aa2e46ec44861579475a8
Author: fjh100456 
Date:   2017-09-13T09:24:15Z

[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' configuration 
doesn't take effect on tables with partition field(s)




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org