[jira] [Comment Edited] (SPARK-29102) Read gzipped file into multiple partitions without full gzip expansion on a single-node

2019-09-27 Thread Nicholas Chammas (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16939856#comment-16939856
 ] 

Nicholas Chammas edited comment on SPARK-29102 at 9/28/19 5:35 AM:
---

I figured it out. Looks like the correct setting is {{io.compression.codecs,}} 
as instructed in the 
[SplittableGzip|https://github.com/nielsbasjes/splittablegzip] repo, not 
{{spark.hadoop.io.compression.codecs}}. I mistakenly tried that after seeing 
others use it elsewhere.

So in summary, to read gzip files in Spark using this codec, you need to:
 # Start up Spark with "{{--packages nl.basjes.hadoop:splittablegzip:1.2}}".
 # Then, enable the new codec with "{{spark.conf.set('io.compression.codecs', 
'nl.basjes.hadoop.io.compress.SplittableGzipCodec')}}".
 # From there, you can read gzipped CSVs as you would normally, via 
"{{spark.read.csv(...)}}".

I've confirmed that, using this codec, Spark loads a single gzipped file with 
multiple concurrent tasks (and without the codec it only runs one task). I 
haven't done any further testing to see what performance benefits there are in 
a realistic use case, but if this codec works as described in its README then 
that should be good enough for me!

I've filed SPARK-29280 about adding a {{compression}} option to 
{{DataFrameReader}} to match {{DataFrameWriter}} and make this kind of workflow 
a bit more straightforward.


was (Author: nchammas):
I figured it out. Looks like the correct setting is {{io.compression.codecs,}} 
as instructed in the 
[SplittableGzip|https://github.com/nielsbasjes/splittablegzip] repo, not 
{{spark.hadoop.io.compression.codecs}}. I mistakenly tried that after seeing 
others use it elsewhere.

So in summary, to read gzip files in Spark using this codec, you need to:
 # Start up Spark with "{{--packages nl.basjes.hadoop:splittablegzip:1.2}}".
 # Then, enable the new codec with "{{spark.conf.set('io.compression.codecs', 
'nl.basjes.hadoop.io.compress.SplittableGzipCodec')}}".
 # From there, you can read gzipped CSVs as you would normally, via 
"{{spark.read.csv(...)}}".

I've confirmed that, using this codec, Spark loads a single gzipped file with 
multiple concurrent tasks (and without the codec it only runs one task). I 
haven't done any further testing to see what performance benefits there are in 
a realistic use case, but if this codec works as described in its README then 
that should be good enough for me!

At this point I think all that remains is for me to file a Jira about adding a 
{{compression}} option to {{DataFrameReader}} to match {{DataFrameWriter}} and 
make this kind of workflow a bit more straightforward.

> Read gzipped file into multiple partitions without full gzip expansion on a 
> single-node
> ---
>
> Key: SPARK-29102
> URL: https://issues.apache.org/jira/browse/SPARK-29102
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.4.4
>Reporter: Nicholas Chammas
>Priority: Minor
>
> Large gzipped files are a common stumbling block for new users (SPARK-5685, 
> SPARK-28366) and an ongoing pain point for users who must process such files 
> delivered from external parties who can't or won't break them up into smaller 
> files or compress them using a splittable compression format like bzip2.
> To deal with large gzipped files today, users must either load them via a 
> single task and then repartition the resulting RDD or DataFrame, or they must 
> launch a preprocessing step outside of Spark to split up the file or 
> recompress it using a splittable format. In either case, the user needs a 
> single host capable of holding the entire decompressed file.
> Spark can potentially a) spare new users the confusion over why only one task 
> is processing their gzipped data, and b) relieve new and experienced users 
> alike from needing to maintain infrastructure capable of decompressing a 
> large gzipped file on a single node, by directly loading gzipped files into 
> multiple partitions across the cluster.
> The rough idea is to have tasks divide a given gzipped file into ranges and 
> then have them all concurrently decompress the file, with each task throwing 
> away the data leading up to the target range. (This kind of partial 
> decompression is apparently [doable using standard Unix 
> utilities|https://unix.stackexchange.com/a/415831/70630], so it should be 
> doable in Spark too.)
> In this way multiple tasks can concurrently load a single gzipped file into 
> multiple partitions. Even though every task will need to unpack the file from 
> the beginning to the task's target range, and the stage will run no faster 
> than what it would take with Spark's current gzip loading behavior, this 
> nonetheless 

[jira] [Comment Edited] (SPARK-29102) Read gzipped file into multiple partitions without full gzip expansion on a single-node

2019-09-18 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932968#comment-16932968
 ] 

Hyukjin Kwon edited comment on SPARK-29102 at 9/19/19 1:27 AM:
---

Yea, that _might_ work. It's been too long since I investigated that so I don't 
even remember if I actually tested or not.
BTW, {{nl.basjes.hadoop.io.compress.SplittableGzipCodec}} actually decompresses 
the same files multiple times duplicately.

It duplicately decompresses and each map task process what they want. And then, 
each map task stops decompressing if they processes what they want.
So. theoretically the map tasks that has to process the last block of the 
gzipped file has to decompress whole file.

Seems like there's performance advantage nevertheless.

Another way:

{quote}
 it has to make an index after scanning once first
{quote}

To fully allow partial decompress (IIRC .. a long ago, I worked on this way), 
it has to make a separate index.
I tried to: scan once first, make a separate index file and decompress it 
partially but IIRC performance was poor (as much as 
{{nl.basjes.hadoop.io.compress.SplittableGzipCodec}} _IIRC_).


was (Author: hyukjin.kwon):
Yea, that _might_ work. It's been too long since I investigated that so I don't 
even remember if I actually tested or not.
BTW, {{nl.basjes.hadoop.io.compress.SplittableGzipCodec}} actually decompresses 
the same files multiple times duplicately.

It duplicatedly decompresses and each map task processes what they want. And 
then, each map task stops decompressing if they processes what they one.
So. theorically the map tasks that has to process the last block of the gzipped 
file has to decompress whole file.

Seems like there's performance advantage nevertheless.

Another way:

{quote}
 it has to make an index after scanning once first
{quote}

To fully allow partial decompress (IIRC .. a long ago, I worked on this way), 
it has to make a separate index.
I tried to: scan once first, make a separate index file and decompress it 
partially but IIRC performance was poor (as much as 
{{nl.basjes.hadoop.io.compress.SplittableGzipCodec}} _IIRC_).

> Read gzipped file into multiple partitions without full gzip expansion on a 
> single-node
> ---
>
> Key: SPARK-29102
> URL: https://issues.apache.org/jira/browse/SPARK-29102
> Project: Spark
>  Issue Type: Improvement
>  Components: Input/Output
>Affects Versions: 2.4.4
>Reporter: Nicholas Chammas
>Priority: Minor
>
> Large gzipped files are a common stumbling block for new users (SPARK-5685, 
> SPARK-28366) and an ongoing pain point for users who must process such files 
> delivered from external parties who can't or won't break them up into smaller 
> files or compress them using a splittable compression format like bzip2.
> To deal with large gzipped files today, users must either load them via a 
> single task and then repartition the resulting RDD or DataFrame, or they must 
> launch a preprocessing step outside of Spark to split up the file or 
> recompress it using a splittable format. In either case, the user needs a 
> single host capable of holding the entire decompressed file.
> Spark can potentially a) spare new users the confusion over why only one task 
> is processing their gzipped data, and b) relieve new and experienced users 
> alike from needing to maintain infrastructure capable of decompressing a 
> large gzipped file on a single node, by directly loading gzipped files into 
> multiple partitions across the cluster.
> The rough idea is to have tasks divide a given gzipped file into ranges and 
> then have them all concurrently decompress the file, with each task throwing 
> away the data leading up to the target range. (This kind of partial 
> decompression is apparently [doable using standard Unix 
> utilities|https://unix.stackexchange.com/a/415831/70630], so it should be 
> doable in Spark too.)
> In this way multiple tasks can concurrently load a single gzipped file into 
> multiple partitions. Even though every task will need to unpack the file from 
> the beginning to the task's target range, and the stage will run no faster 
> than what it would take with Spark's current gzip loading behavior, this 
> nonetheless addresses the two problems called out above. Users no longer need 
> to load and then repartition gzipped files, and their infrastructure does not 
> need to decompress any large gzipped file on a single node.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org