[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21902 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21902#discussion_r206102937 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala --- @@ -117,28 +117,19 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace) AvroJob.setOutputKeySchema(job, outputAvroSchema) -val COMPRESS_KEY = "mapred.output.compress" -parsedOptions.compression match { - case "uncompressed" => -log.info("writing uncompressed Avro records") -job.getConfiguration.setBoolean(COMPRESS_KEY, false) - - case "snappy" => -log.info("compressing Avro output using Snappy") -job.getConfiguration.setBoolean(COMPRESS_KEY, true) -job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC) - - case "deflate" => +val (compression, levelInfo) = parsedOptions.compression match { + case "uncompressed" => (NULL_CODEC, "") + case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => (codec, "") + case DEFLATE_CODEC => val deflateLevel = spark.sessionState.conf.avroDeflateLevel -log.info(s"compressing Avro output using deflate (level=$deflateLevel)") -job.getConfiguration.setBoolean(COMPRESS_KEY, true) -job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC) job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel) - - case unknown: String => -log.error(s"unsupported compression codec $unknown") +(DEFLATE_CODEC, s" (level = $deflateLevel)") --- End diff -- I have refactored this piece of code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21902#discussion_r206073061 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala --- @@ -117,28 +117,19 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace) AvroJob.setOutputKeySchema(job, outputAvroSchema) -val COMPRESS_KEY = "mapred.output.compress" -parsedOptions.compression match { - case "uncompressed" => -log.info("writing uncompressed Avro records") -job.getConfiguration.setBoolean(COMPRESS_KEY, false) - - case "snappy" => -log.info("compressing Avro output using Snappy") -job.getConfiguration.setBoolean(COMPRESS_KEY, true) -job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC) - - case "deflate" => +val (compression, levelInfo) = parsedOptions.compression match { + case "uncompressed" => (NULL_CODEC, "") + case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => (codec, "") + case DEFLATE_CODEC => val deflateLevel = spark.sessionState.conf.avroDeflateLevel -log.info(s"compressing Avro output using deflate (level=$deflateLevel)") -job.getConfiguration.setBoolean(COMPRESS_KEY, true) -job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC) job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel) - - case unknown: String => -log.error(s"unsupported compression codec $unknown") +(DEFLATE_CODEC, s" (level = $deflateLevel)") --- End diff -- Oh, I mean I thought it's a bit odd this case match returns levelInfo abd wondering if we can get rid of it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21902#discussion_r206064935 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala --- @@ -117,28 +117,19 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace) AvroJob.setOutputKeySchema(job, outputAvroSchema) -val COMPRESS_KEY = "mapred.output.compress" -parsedOptions.compression match { - case "uncompressed" => -log.info("writing uncompressed Avro records") -job.getConfiguration.setBoolean(COMPRESS_KEY, false) - - case "snappy" => -log.info("compressing Avro output using Snappy") -job.getConfiguration.setBoolean(COMPRESS_KEY, true) -job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC) - - case "deflate" => +val (compression, levelInfo) = parsedOptions.compression match { + case "uncompressed" => (NULL_CODEC, "") + case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => (codec, "") + case DEFLATE_CODEC => val deflateLevel = spark.sessionState.conf.avroDeflateLevel -log.info(s"compressing Avro output using deflate (level=$deflateLevel)") -job.getConfiguration.setBoolean(COMPRESS_KEY, true) -job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC) job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel) - - case unknown: String => -log.error(s"unsupported compression codec $unknown") +(DEFLATE_CODEC, s" (level = $deflateLevel)") --- End diff -- Conversion of `-1` to `6` is performed in somewhere inside of `zlib`. Do you want to hard code `6` in log message? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21902#discussion_r206000635 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala --- @@ -117,28 +117,19 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace) AvroJob.setOutputKeySchema(job, outputAvroSchema) -val COMPRESS_KEY = "mapred.output.compress" -parsedOptions.compression match { - case "uncompressed" => -log.info("writing uncompressed Avro records") -job.getConfiguration.setBoolean(COMPRESS_KEY, false) - - case "snappy" => -log.info("compressing Avro output using Snappy") -job.getConfiguration.setBoolean(COMPRESS_KEY, true) -job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC) - - case "deflate" => +val (compression, levelInfo) = parsedOptions.compression match { + case "uncompressed" => (NULL_CODEC, "") + case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => (codec, "") + case DEFLATE_CODEC => val deflateLevel = spark.sessionState.conf.avroDeflateLevel -log.info(s"compressing Avro output using deflate (level=$deflateLevel)") -job.getConfiguration.setBoolean(COMPRESS_KEY, true) -job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC) job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel) - - case unknown: String => -log.error(s"unsupported compression codec $unknown") +(DEFLATE_CODEC, s" (level = $deflateLevel)") --- End diff -- @MaxGekk, how about just log another info here and remove `levelInfo` return here? for instance, ``` log.info("Avro compression level 6 will be used for deflate codec.") ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21902#discussion_r205937278 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1449,6 +1451,16 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.xz.level") --- End diff -- > If you faced a case that this configuration is useful, im okay but if not, I would suggest to add it later when it's needed. The config is not so useful as `LZMA2` codec itself. Its default value covers the use case. I will remove the config from the PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21902#discussion_r205937166 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1449,6 +1451,16 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.xz.level") --- End diff -- Im okay - I didn't mean to say I am against but wondering how much it could be useful. If you faced a case that this configuration is useful, im okay but if not, I would suggest to add it later when it's needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21902#discussion_r205937046 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1449,6 +1451,16 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.xz.level") --- End diff -- Yes, it is a new feature. The third party doesn't support `LZMA2` codec. The last one has much better compression than `snappy`/`deflate` even on level `0-3`. The default level `6` covers most of use cases actually when users need to store data in AVRO format permanently as an archive. If you strongly against of the config, I will remove it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21902#discussion_r205934042 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1449,6 +1451,16 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.xz.level") --- End diff -- Just for clarification, @MaxGekk, this configuration is not for keeping the configuration in spark-avro (as a third party) but something you here newly propose? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21902#discussion_r205933175 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1449,6 +1451,16 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.xz.level") +.doc("Compression level for the XZ codec used in writing of AVRO files. " + + "Valid value must be in the range of from 0 to 9 inclusive: " + + "0-3 for fast with medium compression, 4-6 are fairly slow levels with high compression. " + + "The levels 7-9 are like the level 6 but use bigger dictionaries and have higher " + + "compressor and decompressor memory requirements. Default level is 6.") --- End diff -- use `LZMA2Options.PRESET_DEFAULT` in place of literal `6` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21902: [SPARK-24952][SQL] Support LZMA2 compression by A...
GitHub user MaxGekk opened a pull request: https://github.com/apache/spark/pull/21902 [SPARK-24952][SQL] Support LZMA2 compression by Avro datasource ## What changes were proposed in this pull request? In the PR, I propose to support `LZMA2` (`XZ`) and `BZIP2` compressions by `AVRO` datasource in write since the codecs has much better compression ratio comparing to already supported `deflate` and `snappy` codecs. To tune compression level of `XZ`, the PR introduces new SQL config `spark.sql.avro.xz.level` with default value `6`. Allowed range of levels is `[0, 9]`. ## How was this patch tested? It was tested manually and by an existing test which was extended to check the `xz` and `bzip2` compressions. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MaxGekk/spark-1 avro-xz-bzip2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21902.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21902 commit e3b8856c6f8769cf1c2646e7cf5ae41fb3c8d626 Author: Maxim Gekk Date: 2018-07-27T20:15:04Z Support bzip2 commit 7b9dd253e313fb7b5f674672f8bd5447812522a3 Author: Maxim Gekk Date: 2018-07-27T20:40:18Z Support xz commit d4dbeb10656283d957c9c52327da97170f9ad080 Author: Maxim Gekk Date: 2018-07-27T21:12:54Z Refactoring commit 3e1139af293cb2e06e125edfd443a5b5a0265b84 Author: Maxim Gekk Date: 2018-07-27T21:30:30Z Fix comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org