[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-22 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r163150428
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -55,18 +55,28 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
   customPartitionLocations: Map[TablePartitionSpec, String] = 
Map.empty,
   partitionAttributes: Seq[Attribute] = Nil): Set[String] = {
 
-val isCompressed = hadoopConf.get("hive.exec.compress.output", 
"false").toBoolean
+val isCompressed =
+  
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) 
match {
+case formatName if formatName.endsWith("orcoutputformat") =>
+  // For ORC,"mapreduce.output.fileoutputformat.compress",
+  // "mapreduce.output.fileoutputformat.compress.codec", and
+  // "mapreduce.output.fileoutputformat.compress.type"
+  // have no impact because it uses table properties to store 
compression information.
--- End diff --

Ok, I'll try it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r163141749
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -55,18 +55,28 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
   customPartitionLocations: Map[TablePartitionSpec, String] = 
Map.empty,
   partitionAttributes: Seq[Attribute] = Nil): Set[String] = {
 
-val isCompressed = hadoopConf.get("hive.exec.compress.output", 
"false").toBoolean
+val isCompressed =
+  
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) 
match {
+case formatName if formatName.endsWith("orcoutputformat") =>
+  // For ORC,"mapreduce.output.fileoutputformat.compress",
+  // "mapreduce.output.fileoutputformat.compress.codec", and
+  // "mapreduce.output.fileoutputformat.compress.type"
+  // have no impact because it uses table properties to store 
compression information.
--- End diff --

The comment might not be correct now. We need to follow what the latest 
Hive works, if possible. The best way to try Hive (and the other RDBMS) is 
using docker. Maybe you can try the docker?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-22 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r163132078
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -55,18 +55,28 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
   customPartitionLocations: Map[TablePartitionSpec, String] = 
Map.empty,
   partitionAttributes: Seq[Attribute] = Nil): Set[String] = {
 
-val isCompressed = hadoopConf.get("hive.exec.compress.output", 
"false").toBoolean
+val isCompressed =
+  
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) 
match {
+case formatName if formatName.endsWith("orcoutputformat") =>
+  // For ORC,"mapreduce.output.fileoutputformat.compress",
+  // "mapreduce.output.fileoutputformat.compress.codec", and
+  // "mapreduce.output.fileoutputformat.compress.type"
+  // have no impact because it uses table properties to store 
compression information.
--- End diff --

For parquet, using a hive client, `parquet.compression` has a higher 
priority than  `mapreduce.output.fileoutputformat.compress`. And table-level 
compression( set by tblproperties) has the highest priority.  
`parquet.compression` set by cli also has a higher priority than 
`mapreduce.output.fileoutputformat.compress`.

After this pr, the priority does not changed. If table-level compression 
was set, other compression would not take effect, even though 
`mapreduce.output` were set, which is the same with hive. But 
`parquet.compression` set by spark cli does not take effect, unless set 
`hive.exec.compress.output` to true. This may because we do not get 
`parquet.compression` from the session, and I wonder if it's necessary because 
we have `spark.sql.parquet.comression` instead.

For orc, `hive.exec.compress.output` and `mapreduce.output` have no 
impact really, but table-leval  compression (set by tblproperties) always take 
effect.  `orc.compression` set by spark cli does not take effect too, even 
though  set `hive.exec.compress.output` to true, which is differet with 
parquet. 
Another question, the comment say `it uses table properties to store 
compression information`, actully, by manul test, I found orc-tables also can 
have mixed compressions, and the data can be read together correctly.

My Hive version for this test is 1.1.0.  Actully it's a little difficut for 
me to get a higher version runable Hive client.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-21 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162836010
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -55,18 +55,28 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
   customPartitionLocations: Map[TablePartitionSpec, String] = 
Map.empty,
   partitionAttributes: Seq[Attribute] = Nil): Set[String] = {
 
-val isCompressed = hadoopConf.get("hive.exec.compress.output", 
"false").toBoolean
+val isCompressed =
+  
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) 
match {
+case formatName if formatName.endsWith("orcoutputformat") =>
+  // For ORC,"mapreduce.output.fileoutputformat.compress",
+  // "mapreduce.output.fileoutputformat.compress.codec", and
+  // "mapreduce.output.fileoutputformat.compress.type"
+  // have no impact because it uses table properties to store 
compression information.
--- End diff --

Surely, I'll do it this days.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-21 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162802793
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -55,18 +55,28 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
   customPartitionLocations: Map[TablePartitionSpec, String] = 
Map.empty,
   partitionAttributes: Seq[Attribute] = Nil): Set[String] = {
 
-val isCompressed = hadoopConf.get("hive.exec.compress.output", 
"false").toBoolean
+val isCompressed =
+  
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) 
match {
+case formatName if formatName.endsWith("orcoutputformat") =>
+  // For ORC,"mapreduce.output.fileoutputformat.compress",
+  // "mapreduce.output.fileoutputformat.compress.codec", and
+  // "mapreduce.output.fileoutputformat.compress.type"
+  // have no impact because it uses table properties to store 
compression information.
--- End diff --

Although this is the existing behavior, but could you investigate how Hive 
behaves when `Parquet.Compress`  is set. 
https://issues.apache.org/jira/browse/HIVE-7858 Is it the same as ORC?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20087


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-20 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162779218
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 50
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => ParquetOptions.getParquetCompressionCodecName(name)
+  case "orc" => OrcOptions.getORCCompressionCodecName(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter { file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162776118
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 50
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => ParquetOptions.getParquetCompressionCodecName(name)
+  case "orc" => OrcOptions.getORCCompressionCodecName(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter { file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162673688
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162672650
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162671108
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 50
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => ParquetOptions.getParquetCompressionCodecName(name)
+  case "orc" => OrcOptions.getORCCompressionCodecName(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter { file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162673292
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -260,17 +282,21 @@ class CompressionCodecSuite extends TestHiveSingleton 
with ParquetTest with Befo
   def checkForTableWithoutCompressProp(format: String, compressCodecs: 
List[String]): Unit = {
 Seq(true, false).foreach { isPartitioned =>
   Seq(true, false).foreach { convertMetastore =>
-checkTableCompressionCodecForCodecs(
-  format,
-  isPartitioned,
-  convertMetastore,
-  compressionCodecs = compressCodecs,
-  tableCompressionCodecs = List(null)) {
-  case (tableCompressionCodec, sessionCompressionCodec, 
realCompressionCodec, tableSize) =>
-// Always expect session-level take effect
-assert(sessionCompressionCodec == realCompressionCodec)
-assert(checkTableSize(format, sessionCompressionCodec,
-  isPartitioned, convertMetastore, tableSize))
+Seq(true, false).foreach { usingCTAS =>
+  checkTableCompressionCodecForCodecs(
+format,
+isPartitioned,
+convertMetastore,
+usingCTAS,
+compressionCodecs = compressCodecs,
+tableCompressionCodecs = List(null)) {
+case
+  (tableCompressionCodec, sessionCompressionCodec, 
realCompressionCodec, tableSize) =>
--- End diff --

The same here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162671801
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 50
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => ParquetOptions.getParquetCompressionCodecName(name)
+  case "orc" => OrcOptions.getORCCompressionCodecName(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter { file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162673245
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -260,17 +282,21 @@ class CompressionCodecSuite extends TestHiveSingleton 
with ParquetTest with Befo
   def checkForTableWithoutCompressProp(format: String, compressCodecs: 
List[String]): Unit = {
 Seq(true, false).foreach { isPartitioned =>
   Seq(true, false).foreach { convertMetastore =>
-checkTableCompressionCodecForCodecs(
-  format,
-  isPartitioned,
-  convertMetastore,
-  compressionCodecs = compressCodecs,
-  tableCompressionCodecs = List(null)) {
-  case (tableCompressionCodec, sessionCompressionCodec, 
realCompressionCodec, tableSize) =>
-// Always expect session-level take effect
-assert(sessionCompressionCodec == realCompressionCodec)
-assert(checkTableSize(format, sessionCompressionCodec,
-  isPartitioned, convertMetastore, tableSize))
+Seq(true, false).foreach { usingCTAS =>
+  checkTableCompressionCodecForCodecs(
+format,
+isPartitioned,
+convertMetastore,
+usingCTAS,
+compressionCodecs = compressCodecs,
+tableCompressionCodecs = List(null)) {
+case
+  (tableCompressionCodec, sessionCompressionCodec, 
realCompressionCodec, tableSize) =>
+  // Always expect session-level take effect
+  assert(sessionCompressionCodec == realCompressionCodec)
+  assert(checkTableSize(format, sessionCompressionCodec,
+  isPartitioned, convertMetastore, usingCTAS, tableSize))
--- End diff --

```
assert(checkTableSize(
  format, sessionCompressionCodec, isPartitioned, convertMetastore, 
usingCTAS, tableSize))
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162672130
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 ---
@@ -82,4 +82,7 @@ object ParquetOptions {
 "snappy" -> CompressionCodecName.SNAPPY,
 "gzip" -> CompressionCodecName.GZIP,
 "lzo" -> CompressionCodecName.LZO)
+
+  def getParquetCompressionCodecName(name: String): String =
+shortParquetCompressionCodecNames(name).name()
--- End diff --

```Scala
def getParquetCompressionCodecName(name: String): String = {
  shortParquetCompressionCodecNames(name).name()
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-19 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162563141
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+   

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-18 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162551602
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+   

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162551462
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162551351
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-18 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162528298
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+   

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-18 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162520864
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+   

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-18 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162519320
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+   

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162252757
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162253089
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162252636
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162252901
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162252598
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162252484
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
--- End diff --

Reduce it to 50 for decreasing the execution time



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162253009
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162253126
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162252794
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r161698209
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 ---
@@ -76,7 +76,7 @@ object ParquetOptions {
   val MERGE_SCHEMA = "mergeSchema"
 
   // The parquet compression short names
-  private val shortParquetCompressionCodecNames = Map(
+  val shortParquetCompressionCodecNames = Map(
--- End diff --

The same here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r161698145
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala
 ---
@@ -61,7 +61,7 @@ class OrcOptions(
 
 object OrcOptions {
   // The ORC compression short names
-  private val shortOrcCompressionCodecNames = Map(
+  val shortOrcCompressionCodecNames = Map(
--- End diff --

Instead of changing the access modifiers, add a public function 
```Scala
def getORCCompressionCodecName(name: String): String = 
shortOrcCompressionCodecNames(name)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162252680
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
+file.isFile && !file.getName.endsWith(".crc") && file.getName != 
"_SUCCESS"
+  }.map { orcFile =>
+
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
+  }.toSeq
+}
+codecs.distinct
+  }
+
+  private def createTable(
+  rootDir: File,
+  tableName: String,
+  isPartitioned: Boolean,
+  format: String,
+  compressionCodec: Option[String]): Unit = {
+val tblProperties = compressionCodec match {
+  case Some(prop) => 
s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')"
+  case _ => ""
+}
+val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" 
else ""
+sql(
+  s"""
+|CREATE TABLE $tableName(a int)
+|$partitionCreate
+|STORED AS $format
+|LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName'
+|$tblProperties
+  """.stripMargin)
+  }
+
+  private def writeDataToTable(
+  tableName: String,
+  

[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r162252520
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
---
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.orc.OrcConf.COMPRESS
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import org.apache.spark.sql.hive.orc.OrcFileOperator
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+
+class CompressionCodecSuite extends TestHiveSingleton with ParquetTest 
with BeforeAndAfterAll {
+  import spark.implicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+(0 until 
maxRecordNum).toDF("a").createOrReplaceTempView("table_source")
+  }
+
+  override def afterAll(): Unit = {
+try {
+  spark.catalog.dropTempView("table_source")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private val maxRecordNum = 500
+
+  private def getConvertMetastoreConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
+case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
+  }
+
+  private def getSparkCompressionConfName(format: String): String = 
format.toLowerCase match {
+case "parquet" => SQLConf.PARQUET_COMPRESSION.key
+case "orc" => SQLConf.ORC_COMPRESSION.key
+  }
+
+  private def getHiveCompressPropName(format: String): String = 
format.toLowerCase match {
+case "parquet" => ParquetOutputFormat.COMPRESSION
+case "orc" => COMPRESS.getAttribute
+  }
+
+  private def normalizeCodecName(format: String, name: String): String = {
+format.toLowerCase match {
+  case "parquet" => 
ParquetOptions.shortParquetCompressionCodecNames(name).name()
+  case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
+}
+  }
+
+  private def getTableCompressionCodec(path: String, format: String): 
Seq[String] = {
+val hadoopConf = spark.sessionState.newHadoopConf()
+val codecs = format.toLowerCase match {
+  case "parquet" => for {
+footer <- readAllFootersWithoutSummaryFiles(new Path(path), 
hadoopConf)
+block <- footer.getParquetMetadata.getBlocks.asScala
+column <- block.getColumns.asScala
+  } yield column.getCodec.name()
+  case "orc" => new File(path).listFiles().filter{ file =>
--- End diff --

Nit: add a space before `{`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2018-01-07 Thread fjh100456
Github user fjh100456 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r160086482
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,10 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+// Set compression by priority
+HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, 
sparkSession.sessionState.conf)
+  .foreach { case (compression, codec) => hadoopConf.set(compression, 
codec) }
--- End diff --

For parquet, without the changes of this pr, the precedence is table-level 
compression > `mapreduce.output.fileoutputformat.compress`. 
`spark.sql.parquet.compression` never takes effect. But now with this pr, 
`mapreduce.output.fileoutputformat.compress` will not take effect. As an 
alternative, `spark.sql.parquet.compression` will always take effect if there 
is no table level compression.

For ORC, `hive.exec.compress.output` does not take effect, as explained in 
the comments of the Code.

Shall we keep this precedence for parquet? If so, how to deal with ORC?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-29 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20087#discussion_r159117741
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
 ---
@@ -68,6 +68,10 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand {
 .get("mapreduce.output.fileoutputformat.compress.type"))
 }
 
+// Set compression by priority
+HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, 
sparkSession.sessionState.conf)
+  .foreach { case (compression, codec) => hadoopConf.set(compression, 
codec) }
--- End diff --

This will not be affected by `hive.exec.compress.output`? Could you do an 
investigation the relation of this setting with the codes from line 57 to line 
69?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...

2017-12-26 Thread fjh100456
GitHub user fjh100456 opened a pull request:

https://github.com/apache/spark/pull/20087

[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 
'spark.sql.orc.compression.codec' configuration doesn't take effect on hive 
table writing

[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 
'spark.sql.orc.compression.codec' configuration doesn't take effect on hive 
table writing

What changes were proposed in this pull request?

Pass ‘spark.sql.parquet.compression.codec’ value to 
‘parquet.compression’.
Pass ‘spark.sql.orc.compression.codec’ value to ‘orc.compress’.

How was this patch tested?

Add test.

Note: 
This is the same issue mentioned in #19218 . That branch was deleted 
mistakenly, so make a new pr instead.

@gatorsmile @maropu @dongjoon-hyun @discipleforteen


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fjh100456/spark HiveTableWriting

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20087.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20087


commit 9bbfe6ef4b5a418373c2250ad676233fb05df7f7
Author: fjh100456 
Date:   2017-12-25T02:29:53Z

[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 
'ParquetOptions', `parquet.compression` needs to be considered.

## What changes were proposed in this pull request?
1.Increased acquiring 'compressionCodecClassName' from 
`parquet.compression`,and the order is 
`compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just 
like what we do in `OrcOptions`.
2.Change `spark.sql.parquet.compression.codec` to support "none".Actually 
in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but 
it does not allowed to configured to "none".

## How was this patch tested?
Manual test.

commit 48cf108ed5c3298eb860d9735b439ac89d65765e
Author: fjh100456 
Date:   2017-12-25T02:30:24Z

[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 
'ParquetOptions', `parquet.compression` needs to be considered.

## What changes were proposed in this pull request?
1.Increased acquiring 'compressionCodecClassName' from 
`parquet.compression`,and the order is 
`compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just 
like what we do in `OrcOptions`.
2.Change `spark.sql.parquet.compression.codec` to support "none".Actually 
in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but 
it does not allowed to configured to "none".

## How was this patch tested?
Manual test.

commit 5dbd3edf9e086433d3d3fe9c0ead887d799c61d3
Author: fjh100456 
Date:   2017-12-25T02:34:29Z

spark.sql.parquet.compression.codec[SPARK-21786][SQL] When acquiring 
'compressionCodecClassName' in 'ParquetOptions', `parquet.compression` needs to 
be considered.

## What changes were proposed in this pull request?
1.Increased acquiring 'compressionCodecClassName' from 
`parquet.compression`,and the order is 
`compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just 
like what we do in `OrcOptions`.
2.Change `spark.sql.parquet.compression.codec` to support "none".Actually 
in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but 
it does not allowed to configured to "none".

## How was this patch tested?
Manual test.

commit 5124f1b560e942c0dc23af31336317a4b995dd8f
Author: fjh100456 
Date:   2017-12-25T07:06:26Z

spark.sql.parquet.compression.codec[SPARK-21786][SQL] When acquiring 
'compressionCodecClassName' in 'ParquetOptions', `parquet.compression` needs to 
be considered.

## What changes were proposed in this pull request?
1.Increased acquiring 'compressionCodecClassName' from 
`parquet.compression`,and the order is 
`compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just 
like what we do in `OrcOptions`.
2.Change `spark.sql.parquet.compression.codec` to support "none".Actually 
in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but 
it does not allowed to configured to "none".
3.Change `compressionCode` to `compressionCodecClassName`.

## How was this patch tested?
Manual test.

commit 6907a3ef86a2546fae91c22754796490a80e
Author: fjh100456 
Date:   2017-12-25T09:26:33Z

Make comression codec take effect in hive table writing.

commit 67e40d4d7fd3b6a9e4526ce17bf6d4eadb05b2b8
Author: fjh100456 
Date:   2017-12-25T12:08:11Z

Modify test

commit e2526ca1bb72e54c03d977b8678bd14b28c83585
Author: fjh100456 
Date:   2017-12-26T05:38:10Z