[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15689206#comment-15689206 ] liyunzhang_intel commented on PIG-5029: --- [~kexianda]: {quote} I hava a question: if user did not set the RequestedParallelism in the script. what's default setRequestedParallelism? Do we have to handle the default parallelism at runtime? {quote} {code} public static int getParallelism(List> predecessors, PhysicalOperator physicalOperator) { int numReducers = SparkEnv.SPARK_REDUCERS; if (numReducers != 0) { return numReducers; } int parallelism = physicalOperator.getRequestedParallelism(); if (parallelism <= 0) { // Parallelism wasn't set in Pig, so set it to whatever Spark thinks // is reasonable. parallelism = predecessors.get(0).context().defaultParallelism(); } return parallelism; } {code} from above code ,you can see that the requestedParallelism is first decided by SparkEnv.SPARK_REDUCERS(see PIG-5068), secondly decided by the physicalOperator's defaultParallelism and thirdly decided by the parallelism value of parent rdd. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, PIG-5029_2.patch, PIG-5029_3.patch, > PIG-5051_5029_5.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15688696#comment-15688696 ] Xianda Ke commented on PIG-5029: Hi [~kellyzly], Salted key solution seem OK. JDK's Random is a pseudorandom number generator (PRNG), also known as a deterministic random bit generator DRBG. If the seed task-id is same, random numbers sequence will be the same. I hava a question: if user did not set the RequestedParallelism in the script. what's default setRequestedParallelism? Do we have to handle the default parallelism at runtime? > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, PIG-5029_2.patch, PIG-5029_3.patch, > PIG-5051_5029_5.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15624556#comment-15624556 ] liyunzhang_intel commented on PIG-5029: --- [~knoguchi]: Before we discussed a lot about the skewed key sort issue in spark mode. Before i proposed the PIG-5029_1.patch and append a random integer to the original key thus to make the skewed key distributed evenly([salted key solution |http://www.slideshare.net/SparkSummit/top-5-mistakes-when-writing-spark-applications-by-mark-grover-and-ted-malaska]. You mentioned some issues this patch brings such as duplicated and missing results. Later [~tgraves] commented that {quote} I'm assuming in the presentation they are using a salt key such that it generates the same random if the map task is reran like Koji mentioned. For instance can you use the task id (not attempt) such that the key is always the same if it re-runs. {quote} in PIG-5051_5029_5.patch So now i generate the random by {noformat} new Random(seed) {noformat}. Here the value of {noformat}seed{noformat} is PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX); thus the generated random integers in different task attempts of a spark task are same if retries happens because the value of PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX) are same. Do you think this solution is ok? If you think it is ok. [~kexianda] will start to review this patch. Thanks [~knoguchi]'s patience on this jira again and this is a really interesting issue. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, PIG-5029_2.patch, PIG-5029_3.patch, > SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530152#comment-15530152 ] Koji Noguchi commented on PIG-5029: --- {quote} bq. how is pig handling skew for MR/TEZ? Sampling is done and partitioning is done based on the weights assigned to the repeating keys in the sample. {quote} When we really need Random(), then we seed based on taskid with some shifting so that subsequent task won't get similar seeds. Example https://issues.apache.org/jira/browse/PIG-4819 > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, PIG-5029_2.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530114#comment-15530114 ] Rohini Palaniswamy commented on PIG-5029: - bq. how is pig handling skew for MR/TEZ? Sampling is done and partitioning is done based on the weights assigned to the repeating keys in the sample. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, PIG-5029_2.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530104#comment-15530104 ] Koji Noguchi commented on PIG-5029: --- Thanks a million [~tgraves]. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, PIG-5029_2.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530065#comment-15530065 ] Thomas Graves commented on PIG-5029: Is the question whether spark supports maps that aren't idempotent? ie if it runs again it could generate different output? If so I think that is a bad assumption even if it did. maps should be idempotent because you have things like speculative execution and failure cases as you have talked about that rely or could rely on this. Spark does try to be intelligent about rerunning stages, so if you get a fetchFailure of a map after one of the reducers has already finished, it will go rerun that map and then only rerun the reducers that haven't finished. So if the map output is different you could end up with different results across those reducers. The external shuffle doesn't necessarily solve this problem. It helps but if the node that has the map output goes away, it would have to be recomputed. I'm assuming in the presentation they are using a salt key such that it generates the same random if the map task is reran like Koji mentioned. For instance can you use the task id (not attempt) such that the key is always the same if it re-runs. [~rohini] how is pig handling skew for MR/TEZ? > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, PIG-5029_2.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15528532#comment-15528532 ] liyunzhang_intel commented on PIG-5029: --- [~knoguchi]: thanks for patience on this jira. I will enable spark shuffle service to make the reduce task(in spark) to read the persistent shuffle temp output to avoid recompute the map task which causing the redundant data). > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15526775#comment-15526775 ] Koji Noguchi commented on PIG-5029: --- [~kellyzly], I believe I explained how pure RANDOM key would break mapreduce (and Tez). Note that map task is not flagged fail or kill when the first reducer pull the map output successfully. It's only failed/killed and retried when second reducer tries to pull the output and fail. [~tgraves], maybe you can close the gap for us? I don't know Spark to say if the same issue applies to Spark or not. If you can persist the temp output (as suggested by [~vanzin]), that would work too. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524845#comment-15524845 ] liyunzhang_intel commented on PIG-5029: --- [~vanzin]: I guess what [~rohini] and [~knoguchi] worries is about the same sequence will be generated after retrying the given task and spark will *not* remove the temporary output of the previous failed task and will aggregate the temporary result to the final result so causes redundant result. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524832#comment-15524832 ] Marcelo Vanzin commented on PIG-5029: - I'm not sure I understand the question. If you're worried about executors dying, use an external shuffle service. Then shuffle files will be available regardless of executor state. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524780#comment-15524780 ] liyunzhang_intel commented on PIG-5029: --- [~rohini] and [~knoguchi]: thanks for your patience on this jira. bq. Key is making Random less random so that it'll produce same sequence of values when retried for the given task. You mean that different key which does not belongs to the same partition will be distributed to the same partition after we use (key+random integer) after retried the given task, is my understanding right? If my understanding is right, i wonder why salted key(appending random to the key) is widely used in [HBase|https://sematext.com/blog/2012/04/09/hbasewd-avoid-regionserver-hotspotting-despite-writing-records-with-sequential-keys/] and even proposed in the [Spark Submit 2016|http://www.slideshare.net/SparkSummit/top-5-mistakes-when-writing-spark-applications-by-mark-grover-and-ted-malaska] to make the key distributed evenly if salted key will generate problems in the retried task case. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15524159#comment-15524159 ] Koji Noguchi commented on PIG-5029: --- [~rohini], can you take this ? Obviously I'm not explaining it well. Key is making Random less random so that it'll produce same sequence of values when retried for the given task. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15521883#comment-15521883 ] liyunzhang_intel commented on PIG-5029: --- [~vanzin]: Thanks for your comment, here i have a question about using salted key to solve the skewed data problem in the above link: Will *redundant* data be generated? for example, salt the key(append a random integer to make a new key), and transform the key after several rdd transformations, the spark job/stage/task fails because of fetch failure or node failure and the temporary output is still saved on the disk and spark retries task. Will spark aggregate the *temporary output* which is generated by the last failed task to the final result? I think this will *not* happen because spark will remove temporary output if [fail to fetch map outputs|https://github.com/apache/spark/blob/649fa4bf1d6fc9271ae56b6891bc93ebf57858d1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1278]. Can you double confirm this because [~knoguchi] proposed that it will generate redundant data after fetch failure if we use salt key solution(append a random integer to make a new key to distribute keys more evenly)? [~knoguchi]: And about node failure to cause the redundant key, i think spark will rerun the task on other nodes and this will not aggregate temporary output on the failed node to the final result. About data missing, i think this will *only* happen when we use the random integer as the *only* key of the tuple(from the case you provided in PIG-3257) > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15516915#comment-15516915 ] Marcelo Vanzin commented on PIG-5029: - Not really my area of expertise; but this reminds me of Ted Malaska's talk (http://www.slideshare.net/SparkSummit/top-5-mistakes-when-writing-spark-applications-by-mark-grover-and-ted-malaska, starts at slide 40), maybe that can be used (sort of sounds similar to the weighted partitioner). > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15515605#comment-15515605 ] liyunzhang_intel commented on PIG-5029: --- [~kexianda], [~mohitsabharwal] , [~pallavi.rao] and [~xuefuz]: Current I encounter the problem when I use RDD.sortByKey() when the data is skewed. Although RangePartitioner is used in RDD.sortByKey. Give an example to explain the problem. {code} val rdd = sc.parallelize(1.to(2000)).map(x=>(x/2000,x)) val numPartitions = 2 val partitioner = new RangePartition(numPartition,rdd) val numsOfPartitions = rdd.keys.map(k=>partitioner.getPartition( numsOfPartition.map(pair=>println("key:"+pair._1+ "val:"+pair._2)) {code} {code} The Result: Key:0 val:1999 Key:1 val:1 {code} As the code will generate data like [(0,1),(0,2),……(0,1999),(1,2000)], we use RangePartitioner to partition the data. Although it is said that It will partitions sortable records by range into roughly. But it seems that it will not distribute skewed keys into different partitions evenly. After reviewing the code of RangePartitioner, I found that although RangePartitioner first sample, then order the sample data, then define the range for each partition to guarantee that keys to the correct partition(here correct means that elements in (partition-1) partition is all smaller than specified key and elements in (partition+1) partitions are bigger than specified key). But RangePartitioner cannot guarantee that Keys are distributed evenly. After reviewing the pig code WeightedRangePartitioner, I found that this WeightedRangePartitioner could handle this kind of case. The difference between WeightedRangePartitioner and RangePartitioner is that WeightedRangePartitioner also consider the weight of each element, if an element occupies a lot in the whole data. It will occupies some partitions( for example 7 of total 40 partition) not only 1 partition. In order to solve this case, I have tried following solution( append a random to the key to make the key distributed evenly) and remove the random in the result like: {code} val random = new java.util.Random() val rdd = sc.parallelize(1.to(2000)).map(x=>((x,random.nextInt()),x)) val numPartitions = 2 val partitioner = new org.apache.spark.RangePartitioner(numPartitions,rdd) val numsOfPartitions = rdd.keys.map(k=>partitioner.getPartition(k)).countByValue() numsOfPartition.map(pair=>println("key:"+pair._1+ "val:"+pair._2)) val newRdd = rdd.map( p => (p._1._1, p._2)) {code} {code} The Result: Key:0 val:999 Key:1 val:1001 {code} It shows that 2000 elements are distributed evenly in 2 partition after this solution. But we *cannot* use this solution as some guy from Community suggest not because this kind of solution will cause data *redundant* and *missing* if spark job retires after failure. My question is: Is there any better way to solve skewed key sort case in spark? > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the te
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15505624#comment-15505624 ] Xianda Ke commented on PIG-5029: [~kellyzly], when task re-run, the partitioning is not the same the first run, because of the random int. some records may be duplicated, some may be missed. for your information: in SkewedJoin(PIG-4858), records is sent to the reducers in a round robin fashion. This partitioning is not random, but even. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15505394#comment-15505394 ] liyunzhang_intel commented on PIG-5029: --- [~knoguchi]: {quote} If node goes down after reducer0_attempt0 pulled map output but before reducer1_attempt0 started pulling, then map output needs to be re-computed. {quote} Hadoop will not delete the outputs before recovery( create a new task attempt to recompute)? > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15505381#comment-15505381 ] Koji Noguchi commented on PIG-5029: --- bq. Hadoop will try new task attempt only when last task attempt fail As I wrote in the original comment, " (could be fetch failure or node failure)." If node goes down after reducer0_attempt0 pulled map output but before reducer1_attempt0 started pulling, then map output needs to be _re-computed_. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15505358#comment-15505358 ] liyunzhang_intel commented on PIG-5029: --- [~knoguchi]: if this has nothing to do with speculative execution, why after reducer0_attempt0 successfully finshes pulls mapper0_attemp0 outputs and produces all n records, reduce1_attempt0 starts to pull repetitive n records from mapper0_attempt0. Hadoop will try [new task attempt|https://github.com/apache/hadoop/blob/819224dcf9c683aa52f58633ac8e13663f1916d8/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java#L647] only when last task attempt fails. Is there anything i misunderstand? > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15503610#comment-15503610 ] Koji Noguchi commented on PIG-5029: --- [~kellyzly], this has nothing to do with speculative execution. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15500402#comment-15500402 ] liyunzhang_intel commented on PIG-5029: --- [~knoguchi]: Thanks for your reply. Here is a question about the example you provided in PIG-3257. {code} A = load ... B = group A by UUID(); C = foreach B... {code} {quote}This job could successfully finish with output ranging from 0 to 2n records. For example, sequence of events can be, mapper0_attempt0 finish with n outputs and say all n uuid keys were assigned to reducer0. reducer0_attempt0 pulls map outputs and produces n outputs. reducer1_attempt0 tries to pull mapper0_attempt0 output and fail. (could be fetch failure or node failure). mapper0_attempt1 rerun. And this time, all n uuid keys were assigned to reducer1. reducer1_attempt0 pulls mapper0_attempt1 output and produces n outputs. job finish successfully with 2n outputs. This is certainly unexpected to users. {quote} My question is: 1. reducer0_attempt0 and reduce1_attempt0 both pull map outpout and ready to produce n outputs because of speculative mechanism? 2. If because of speculative mechanism, after reduce0_attempt0 finishes to produce n output, hadoop will cancel reduce1_attempt0 because reduce0_attempt0 success) so there is no possibility to generate 2n outputs. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15490809#comment-15490809 ] Rohini Palaniswamy commented on PIG-5029: - bq. Although spark will sample the data automatically before sort but in the benchmark test, the effect is bad You should file a spark jira as well so that they can fix their sampling logic. bq. Now i'm investigating to use SkewedPartitioner That is for skewed join. For order by it is WeightedRangePartitioner. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15490595#comment-15490595 ] Koji Noguchi commented on PIG-5029: --- bq. Can you explain more about why this cause data loss and duplication? I mentioned about pure random hurting results once in https://issues.apache.org/jira/browse/PIG-3257?focusedCommentId=13669195l#comment-13669195 Also, I had to rewrite RANDOM udf later at PIG-4819. And now, we're finding similar issue with RANDOM() call inside DiscreteProbabilitySampleGenerator which is used by pig's order-by call. (Jira and patch to come soon.) If spark has any kind of retries on failures, then you probably want to avoid pure randomness. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15489116#comment-15489116 ] liyunzhang_intel commented on PIG-5029: --- [~rohini] and [~knoguchi]: Thanks for interest in this skewed key sort problem. [~rohini]: bq. Pig has always handled skew automatically during order by. Yes, in mr mode, pig will sample, partition and sort. In spark mode, just use RDD.sortByKey to implement sort feature. Although spark will sample the data automatically before sort but in the benchmark test, the effect is bad( a skewed key will be in 1 partition thus cause one part- is large while others are small) bq.Using Random can cause data loss and duplication during reruns and should be avoided at all costs. We have been bitten by this a lot of times. This is the only solution now i can get and shows a good performance improvement. Can you explain more about why this cause data loss and duplication? Now i'm investigating to use org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner which make the skewed key distributed evenly in spark mode. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15488515#comment-15488515 ] Rohini Palaniswamy commented on PIG-5029: - [~knoguchi] just pointed out this jira to me as there is plan to use random numbers. bq. set an option "pig.spark.skewed.sort" in pig.properties and can be enabled if user consider the sorted data is distributed skewed. This should be the default behavior and not a configuration for user. Pig has always handled skew automatically during order by. bq. append a random integer to the sorted tuple like (1,(xxx)) to (1,(xxx), random integer)(SortConverter.ToKeyValueFunction). Using Random can cause data loss and duplication during reruns and should be avoided at all costs. We have been bitten by this a lot of times. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PIG-5029) Optimize sort case when data is skewed
[ https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15486676#comment-15486676 ] liyunzhang_intel commented on PIG-5029: --- The solution to solve the skewed data sort in PIG-5029.patch is: 0. set an option "pig.spark.skewed.sort" in pig.properties and can be enabled if user consider the sorted data is distributed skewed. 1. append a random integer to the sorted tuple like (1,(xxx)) to (1,(xxx), random integer)(SortConverter.ToKeyValueFunction). 2. add a new class SkewedKeyComparator in SortConverter which will influence the partition in the shuffle process of RDD.sortByKey. Before the partition will not be distributed evenly because of the skewed key. But now combined key (key,random integer) is used in the partition so the partition will not be skewed anymore. 3.The last random integer will be removed from the tuple in SortConverter.ToValueFunction. > Optimize sort case when data is skewed > -- > > Key: PIG-5029 > URL: https://issues.apache.org/jira/browse/PIG-5029 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5029.patch, SkewedData_L9.docx > > > In PigMix L9.pig > {code} > register $PIGMIX_JAR > A = load '$HDFS_ROOT/page_views' using > org.apache.pig.test.pigmix.udf.PigPerformanceLoader() > as (user, action, timespent, query_term, ip_addr, timestamp, > estimated_revenue, page_info, page_links); > B = order A by query_term parallel $PARALLEL; > store B into '$PIGMIX_OUTPUT/L9out'; > {code} > The pig physical plan will be changed to spark plan and to spark lineage: > {code} > [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter > (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at > map at StoreConverter.java:80 [] > | MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 [] > | ShuffledRDD[6] at sortByKey at SortConverter.java:56 [] > +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 [] > | MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 [] > | MapPartitionsRDD[1] at map at LoadConverter.java:127 [] > | NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 [] > {code} > We use > [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56] > to implement the sort feature. Although > [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106] > is used by RDD.sortByKey and RangePartitiner will sample data and ranges the > key roughly into equal range, the test result(attached document) shows that > one partition will load most keys and take long time to finish. -- This message was sent by Atlassian JIRA (v6.3.4#6332)