[jira] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2017-01-30 Thread Ruslan Dautkhanov (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Ruslan Dautkhanov commented on  SPARK-18105 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: LZ4 failed to decompress a stream of shuffled data  
 
 
 
 
 
 
 
 
 
 
Thanks for the follow up Davies Liu.  We'd have to wait for a cdh parcel of Spark 2.1 to be released to try it out.  
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2017-01-30 Thread Davies Liu (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Davies Liu commented on  SPARK-18105 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: LZ4 failed to decompress a stream of shuffled data  
 
 
 
 
 
 
 
 
 
 
There is a workaround merged into Spark 2.1 for these type of failures (decompress it and try again), can you try that? 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2017-01-30 Thread Ruslan Dautkhanov (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Ruslan Dautkhanov commented on  SPARK-18105 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: LZ4 failed to decompress a stream of shuffled data  
 
 
 
 
 
 
 
 
 
 
It's also worth to mention that the above query runs with spark.sql.shuffle.partitions= 36000 on a highly skewed data. So it's possible that some of the partitions are highly populated; while others might be empty. This problem happens here https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java#L163 I thought if this magic header check is broken when lz4 streams got merged and it does not insert that magic header for empty partitions  (or inserts it oncorrectly for empty partitions?). That's just a hypothesis, as I couldn't find code that merges compressed files. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SPARK-18105) LZ4 failed to decompress a stream of shuffled data

2017-01-29 Thread Ruslan Dautkhanov (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Ruslan Dautkhanov commented on  SPARK-18105 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: LZ4 failed to decompress a stream of shuffled data  
 
 
 
 
 
 
 
 
 
 
Ran into the same issue  
Looks like it's a "floating" issue - happens more over time. Restarting a spark context makes it harder to reproduce, but then it starts happening again. Also, seems a precursor for this issue to show up is running executors tight on memory. I had stages failed many times, like : 

 
ExecutorLostFailure (executor 49 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 9.4 GB of 9 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
 

 
(many executors but not all were killed like this) And then after several job restart attempts, this shows up: 

 
java.io.IOException: Stream is corrupted
	at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:163)
	at org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
 

 
once this problem showed up, I don't see executors failed because of memory, stage starts failing just because of this one problem - java.io.IOException: Stream is corrupted at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:163) 
Here's my job/spark sql query (running from pyspark if this matters) that at some point start reproducing this problem : 

 

create table marketview.cb_filt_ratings2
stored as parquet
as
select psn, merchant_id
 , PERCENT_RANK() OVER (PARTITION BY merchant_id ORDER BY records_count ASC)record_count_rating
 , PERCENT_RANK() OVER (PARTITION BY merchant_id ORDER BY transaction_count ASC)transaction_count_rating
 , PERCENT_RANK() OVER (PARTITION BY merchant_id ORDER BY sum_spend ASC)sum_spend_rating
from marketview.cb_filt_ratings1
 

 
table has ~6B rows. 
ps. Also, the same problem was reported on SO a couple of months ago - http://stackoverflow.com/questions/40289464/spark-job-failing-in-yarn-mode 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment