[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...

2018-07-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21902


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...

2018-07-30 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21902#discussion_r206102937
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala ---
@@ -117,28 +117,19 @@ private[avro] class AvroFileFormat extends FileFormat 
with DataSourceRegister {
   dataSchema, nullable = false, parsedOptions.recordName, 
parsedOptions.recordNamespace)
 
 AvroJob.setOutputKeySchema(job, outputAvroSchema)
-val COMPRESS_KEY = "mapred.output.compress"
 
-parsedOptions.compression match {
-  case "uncompressed" =>
-log.info("writing uncompressed Avro records")
-job.getConfiguration.setBoolean(COMPRESS_KEY, false)
-
-  case "snappy" =>
-log.info("compressing Avro output using Snappy")
-job.getConfiguration.setBoolean(COMPRESS_KEY, true)
-job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, 
DataFileConstants.SNAPPY_CODEC)
-
-  case "deflate" =>
+val (compression, levelInfo) = parsedOptions.compression match {
+  case "uncompressed" => (NULL_CODEC, "")
+  case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => (codec, "")
+  case DEFLATE_CODEC =>
 val deflateLevel = spark.sessionState.conf.avroDeflateLevel
-log.info(s"compressing Avro output using deflate 
(level=$deflateLevel)")
-job.getConfiguration.setBoolean(COMPRESS_KEY, true)
-job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, 
DataFileConstants.DEFLATE_CODEC)
 job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, 
deflateLevel)
-
-  case unknown: String =>
-log.error(s"unsupported compression codec $unknown")
+(DEFLATE_CODEC, s" (level = $deflateLevel)")
--- End diff --

I have refactored this piece of code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...

2018-07-30 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21902#discussion_r206073061
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala ---
@@ -117,28 +117,19 @@ private[avro] class AvroFileFormat extends FileFormat 
with DataSourceRegister {
   dataSchema, nullable = false, parsedOptions.recordName, 
parsedOptions.recordNamespace)
 
 AvroJob.setOutputKeySchema(job, outputAvroSchema)
-val COMPRESS_KEY = "mapred.output.compress"
 
-parsedOptions.compression match {
-  case "uncompressed" =>
-log.info("writing uncompressed Avro records")
-job.getConfiguration.setBoolean(COMPRESS_KEY, false)
-
-  case "snappy" =>
-log.info("compressing Avro output using Snappy")
-job.getConfiguration.setBoolean(COMPRESS_KEY, true)
-job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, 
DataFileConstants.SNAPPY_CODEC)
-
-  case "deflate" =>
+val (compression, levelInfo) = parsedOptions.compression match {
+  case "uncompressed" => (NULL_CODEC, "")
+  case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => (codec, "")
+  case DEFLATE_CODEC =>
 val deflateLevel = spark.sessionState.conf.avroDeflateLevel
-log.info(s"compressing Avro output using deflate 
(level=$deflateLevel)")
-job.getConfiguration.setBoolean(COMPRESS_KEY, true)
-job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, 
DataFileConstants.DEFLATE_CODEC)
 job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, 
deflateLevel)
-
-  case unknown: String =>
-log.error(s"unsupported compression codec $unknown")
+(DEFLATE_CODEC, s" (level = $deflateLevel)")
--- End diff --

Oh, I mean I thought it's a bit odd this case match returns levelInfo abd 
wondering if we can get rid of it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...

2018-07-30 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21902#discussion_r206064935
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala ---
@@ -117,28 +117,19 @@ private[avro] class AvroFileFormat extends FileFormat 
with DataSourceRegister {
   dataSchema, nullable = false, parsedOptions.recordName, 
parsedOptions.recordNamespace)
 
 AvroJob.setOutputKeySchema(job, outputAvroSchema)
-val COMPRESS_KEY = "mapred.output.compress"
 
-parsedOptions.compression match {
-  case "uncompressed" =>
-log.info("writing uncompressed Avro records")
-job.getConfiguration.setBoolean(COMPRESS_KEY, false)
-
-  case "snappy" =>
-log.info("compressing Avro output using Snappy")
-job.getConfiguration.setBoolean(COMPRESS_KEY, true)
-job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, 
DataFileConstants.SNAPPY_CODEC)
-
-  case "deflate" =>
+val (compression, levelInfo) = parsedOptions.compression match {
+  case "uncompressed" => (NULL_CODEC, "")
+  case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => (codec, "")
+  case DEFLATE_CODEC =>
 val deflateLevel = spark.sessionState.conf.avroDeflateLevel
-log.info(s"compressing Avro output using deflate 
(level=$deflateLevel)")
-job.getConfiguration.setBoolean(COMPRESS_KEY, true)
-job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, 
DataFileConstants.DEFLATE_CODEC)
 job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, 
deflateLevel)
-
-  case unknown: String =>
-log.error(s"unsupported compression codec $unknown")
+(DEFLATE_CODEC, s" (level = $deflateLevel)")
--- End diff --

Conversion of `-1` to `6` is performed in somewhere inside of `zlib`. Do 
you want to hard code `6` in log message?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...

2018-07-29 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21902#discussion_r206000635
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala ---
@@ -117,28 +117,19 @@ private[avro] class AvroFileFormat extends FileFormat 
with DataSourceRegister {
   dataSchema, nullable = false, parsedOptions.recordName, 
parsedOptions.recordNamespace)
 
 AvroJob.setOutputKeySchema(job, outputAvroSchema)
-val COMPRESS_KEY = "mapred.output.compress"
 
-parsedOptions.compression match {
-  case "uncompressed" =>
-log.info("writing uncompressed Avro records")
-job.getConfiguration.setBoolean(COMPRESS_KEY, false)
-
-  case "snappy" =>
-log.info("compressing Avro output using Snappy")
-job.getConfiguration.setBoolean(COMPRESS_KEY, true)
-job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, 
DataFileConstants.SNAPPY_CODEC)
-
-  case "deflate" =>
+val (compression, levelInfo) = parsedOptions.compression match {
+  case "uncompressed" => (NULL_CODEC, "")
+  case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => (codec, "")
+  case DEFLATE_CODEC =>
 val deflateLevel = spark.sessionState.conf.avroDeflateLevel
-log.info(s"compressing Avro output using deflate 
(level=$deflateLevel)")
-job.getConfiguration.setBoolean(COMPRESS_KEY, true)
-job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, 
DataFileConstants.DEFLATE_CODEC)
 job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, 
deflateLevel)
-
-  case unknown: String =>
-log.error(s"unsupported compression codec $unknown")
+(DEFLATE_CODEC, s" (level = $deflateLevel)")
--- End diff --

@MaxGekk, how about just log another info here and remove `levelInfo` 
return here? for instance,

```
log.info("Avro compression level 6 will be used for deflate codec.")
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...

2018-07-28 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21902#discussion_r205937278
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1449,6 +1451,16 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.xz.level")
--- End diff --

> If you faced a case that this configuration is useful, im okay but if 
not, I would suggest to add it later when it's needed.

The config is not so useful as `LZMA2` codec itself. Its default value 
covers the use case. I will remove the config from the PR. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...

2018-07-28 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21902#discussion_r205937166
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1449,6 +1451,16 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.xz.level")
--- End diff --

Im okay - I didn't mean to say I am against but wondering how much it could 
be useful. If you faced a case that this configuration is useful, im okay but 
if not, I would suggest to add it later when it's needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...

2018-07-28 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21902#discussion_r205937046
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1449,6 +1451,16 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.xz.level")
--- End diff --

Yes, it is a new feature. The third party doesn't support `LZMA2` codec. 
The last one has much better compression than `snappy`/`deflate` even on level 
`0-3`. The default level `6` covers most of use cases actually when users need 
to store data in AVRO format permanently as an archive. If you strongly against 
of the config, I will remove it. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...

2018-07-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21902#discussion_r205934042
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1449,6 +1451,16 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.xz.level")
--- End diff --

Just for clarification, @MaxGekk, this configuration is not for keeping the 
configuration in spark-avro (as a third party) but something you here newly 
propose?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...

2018-07-27 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21902#discussion_r205933175
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1449,6 +1451,16 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.xz.level")
+.doc("Compression level for the XZ codec used in writing of AVRO 
files. " +
+  "Valid value must be in the range of from 0 to 9 inclusive: " +
+  "0-3 for fast with medium compression, 4-6 are fairly slow levels 
with high compression. " +
+  "The levels 7-9 are like the level 6 but use bigger dictionaries and 
have higher " +
+  "compressor and decompressor memory requirements. Default level is 
6.")
--- End diff --

use `LZMA2Options.PRESET_DEFAULT` in place of literal `6`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...

2018-07-27 Thread MaxGekk
GitHub user MaxGekk opened a pull request:

https://github.com/apache/spark/pull/21902

[SPARK-24952][SQL] Support LZMA2 compression by Avro datasource

## What changes were proposed in this pull request?

In the PR, I propose to support `LZMA2` (`XZ`) and `BZIP2` compressions by 
`AVRO` datasource  in write since the codecs has much better compression ratio 
comparing to already supported `deflate` and `snappy` codecs. To tune 
compression level of `XZ`, the PR introduces new SQL config 
`spark.sql.avro.xz.level` with default value `6`. Allowed range of levels is 
`[0, 9]`.

## How was this patch tested?

It was tested manually and by an existing test which was extended to check 
the `xz` and `bzip2` compressions.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MaxGekk/spark-1 avro-xz-bzip2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21902.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21902


commit e3b8856c6f8769cf1c2646e7cf5ae41fb3c8d626
Author: Maxim Gekk 
Date:   2018-07-27T20:15:04Z

Support bzip2

commit 7b9dd253e313fb7b5f674672f8bd5447812522a3
Author: Maxim Gekk 
Date:   2018-07-27T20:40:18Z

Support xz

commit d4dbeb10656283d957c9c52327da97170f9ad080
Author: Maxim Gekk 
Date:   2018-07-27T21:12:54Z

Refactoring

commit 3e1139af293cb2e06e125edfd443a5b5a0265b84
Author: Maxim Gekk 
Date:   2018-07-27T21:30:30Z

Fix comments




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org